如何實時計算日累計逐單資金流

語言: CN / TW / HK

在股票交易市場,資金流是一個重要的量價指標。資金流指標按照是否對交易訂單號進行合併計算,可以分為 逐筆資金流逐單資金流 ;按照統計時間,可以分為 分鐘資金流日累計資金流 。其中 逐筆資金流 的處理邏輯比較簡單,直接對每一筆成交記錄的成交股數或者成交金額進行大小單的判斷,然後進行相關指標的計算。而 逐單資金流 相對複雜一些,需要先根據買賣訂單號進行合併,然後進行大小單的判斷和相關指標的計算。

關於實時計算 逐單分鐘資金流 的解決方案,可以參考教程:​ ​DolphinDB流計算在金融行業的應用:實時計算分鐘資金流​

本教程主要提供一種基於DolphinDB流資料處理框架,實時計算 日累計逐單資金流 的低延時解決方案。

注意:本教程後文提到的日累計資金流都是指日累計逐單資金流。

本教程包含日累計資金流場景描述、指標實現和實時計算結果展示等內容。

1. 日累計資金流場景描述

1.1 實時計算日累計資金流的難點

  • 日累計逐單資金流計算中的大小單是一個動態的概念,一個小單在成交量增加後可能變成一個大單。
  • 日累計逐單資金流的計算過程中涉及歷史狀態,如若不能實現增量計算,當計算下午的資料時,可能需要回溯有關這筆訂單上午的資料,效率會非常低下。
  • 該場景需要對每一筆成交記錄做出一次響應,計算出每隻股票截止當前成交記錄的最新日累計資金流指標,實時計算壓力較大。
  • 計算涉及至少兩個階段:在第一階段需要根據訂單分組,根據訂單的累計成交量判斷大小單;在第二階段要根據股票來分組,統計每個股票的大小單數量及成交額。
  • 實時流計算場景中的低延時要求。

1.2 逐筆成交資料

本教程基於上交所2020年某日的逐筆成交資料進行程式碼除錯,在DolphinDB中儲存的表結構為:

name

typeString

comment

SecurityID

SYMBOL

股票程式碼

Market

SYMBOL

交易所

TradeTime

TIMESTAMP

交易時間

TradePrice

DOUBLE

交易價格

TradeQty

INT

成交量

TradeAmount

DOUBLE

成交額

BuyNum

INT

買單訂單號

SellNum

INT

賣單訂單號

1.3 日累計資金流指標

本教程示例程式碼計算的日累計資金流指標為:

指標名稱

含義

TotalAmount

從開盤到當前記錄,總成交額

SellSmallAmount

從開盤到當前記錄,賣方向小單的總成交額,成交股數小於等於2萬股

SellMediumAmount

從開盤到當前記錄,賣方向中單的總成交額,成交股數大於2萬股、小於等於20萬股

SellBigAmount

從開盤到當前記錄,賣方向大單的總成交額,成交股數大於20萬股

SellSmallCount

從開盤到當前記錄,賣方向小單的總訂單數,成交股數小於等於2萬股

SellMediumCount

從開盤到當前記錄,賣方向中單的總訂單數,成交股數大於2萬股、小於等於20萬股

SellBigCount

從開盤到當前記錄,賣方向大單的總訂單數,成交股數大於20萬股

BuySmallAmount

從開盤到當前記錄,買方向小單的總成交額,成交股數小於等於2萬股

BuyMediumAmount

從開盤到當前記錄,買方向中單的總成交額,成交股數大於2萬股、小於等於20萬股

BuyBigAmount

從開盤到當前記錄,買方向大單的總成交額,成交股數大於20萬股

BuySmallCount

從開盤到當前記錄,買方向小單的總訂單數,成交股數小於等於2萬股

BuyMediumCount

從開盤到當前記錄,買方向中單的總訂單數,成交股數大於2萬股、小於等於20萬股

BuyBigCount

從開盤到當前記錄,買方向大單的總訂單數,成交股數大於20萬股

關於資金流大小單的劃分規則,不同的開發者會有不同的定義方法。以常用的股票行情軟體為例:

(1)​ ​東方財富​

  • 超級大單:>50萬股或100萬元
  • 大單:10-50萬股或20-100萬元
  • 中單:2-10萬股或4-20萬元
  • 小單:<2萬股或4萬元

(2)​​新浪財經​​

  • 特大單:>100萬元
  • 大單:20-100萬元
  • 小單:5-20萬元
  • 散單:<5萬元

包括大智慧、同花順等,不同軟體之間的大小單區分規則都會有差異,但是判斷條件都是基於成交股數或成交金額。

注意:本教程中,資金流大小單的判斷條件基於成交股數,劃分了大單、中單、小單三種,判斷的邊界值是隨機定義的,開發者必須根據自己的實際場景進行調整。

1.4 日累計資金流增量計算方案

日累計逐單資金流的增量計算包括兩個步驟。首先是計算每個買單或賣單的累計成交量,據此判斷訂單是大單,中單或小單。這一步的增量計算實現比較簡單,只要按訂單分組,並用cumsum計算累計的成交量。在此基礎上,進一步按股票統計大小單的數量和交易金額等指標。這一步如果沒有實現增量計算,那麼每次統計大中小單的數量的耗時會越來越長,因為訂單數量在不斷的增加。事實上,如果我們能夠獲得某一訂單當前時刻的狀態(大單、中單、小單等)以及前一個時刻的狀態,第二步的增量計算就非常簡單。

處理流程圖說明:

  • tradeOriginalStream是DolphinDB中的流資料表,用於接收實時資料來源的資料併發布給流計算引擎進行實時計算。
  • capitalFlowStream是DolphinDB中的流資料表,用於實時接收流計算引擎的計算結果,其資料可以被外部消費者訂閱消費。
  • ​parallel​ ​​引數是指流計算的並行度,本教程中把逐筆成交表​ ​tradeOriginalStream​ ​​中的資料對​ ​SecurityID​ ​​欄位(股票程式碼)按照雜湊演算法,相對均勻地釋出到​ ​parallel​ ​個響應式狀態引擎1實現平行計算。因為逐筆成交表的資料流量較大,且日累計逐單資金流指標的計算相對複雜,所以需要使用並行流處理。
  • 響應式狀態引擎1結合內建的​ ​cumsum​ ​​,​ ​prev​ ​函式,增量計算當前訂單根據股票程式碼和買單訂單號分組後的累計成交金額,以及當前訂單合入前後的大小單標籤、累計成交量,更詳細的計算邏輯介在第2章的程式碼開發部分說明。
  • 響應式狀態引擎2結合內建的​ ​cumsum​ ​​,​ ​prev​ ​函式,增量計算當前訂單根據股票程式碼和賣單訂單號分組後的累計成交金額,以及當前訂單合入前後的大小單標籤、累計成交量,同時保留上一步買方向的中間計算結果,更詳細的計算邏輯會在第2章的程式碼開發部分說明。
  • 響應式狀態引擎3結合內建的​ ​cumsum​ ​​,​ ​dynamicGroupCumsum​ ​​,​ ​dynamicGroupCumcount​ ​函式實現根據股票程式碼合併的資金流指標的增量計算,更詳細的計算邏輯會在第2章的程式碼開發部分說明。

2. 日累計資金流指標實現

本教程程式碼開發工具採用​ ​DolphinDB GUI​ ​,所有程式碼均可在DolphinDB GUI客戶端開發工具執行。

2.1 建立相關流資料表

def createStreamTableFunc(){
  //create stream table: tradeOriginalStream
  colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum
  colType = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT]
  tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType)
  try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
  catch(ex){ print(ex) }
  undef("tradeOriginalStreamTemp")
  
  //create stream table: capitalFlow
  colName = `SecurityID`TradeTime`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
  colType =  [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
  capitalFlowStreamTemp = streamTable(20000000:0, colName, colType)
  try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
  catch(ex){ print(ex) }
  undef("capitalFlowStreamTemp")
  
  //create stream table: capitalFlowStream60min
  colName = `TradeTime`SecurityID`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
  colType =  [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
  capitalFlowStream60minTemp = streamTable(1000000:0, colName, colType)
  try{ enableTableShareAndPersistence(table=capitalFlowStream60minTemp, tableName="capitalFlowStream60min", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
  catch(ex){ print(ex) }
  undef("capitalFlowStreamTemp")
}

createStreamTableFunc()
go
setStreamTableFilterColumn(tradeOriginalStream, `SecurityID)
  • ​go​ ​語句的作用是對程式碼分段進行解析和執行。
  • ​setStreamTableFilterColumn​ ​​函式作用是指定流資料表的過濾列,與​ ​subscribeTable​ ​​函式的​ ​filter​ ​ 引數配合使用。本教程中的作用是把逐筆成交表中的資料對股票程式碼按照雜湊演算法,相對均勻地釋出到不同的流處理執行緒消費,實現平行計算的目的。

2.2 定義資金流大小單判斷的函式

/* 
 * Label small, medium and large order
 * small : 0
 * medium : 1
 * large : 2
 */
@state
def tagFunc(qty){
    return iif(qty <= 20000, 0, iif(qty <= 200000 and qty > 20000, 1, 2))
}
  • 成交股數小於等於2萬股的訂單標記為小單,標籤為0;成交股數大於2萬股、小於等於20萬股的訂單標記為中單,標籤為1;成交股數大於20萬股的訂單標記為大單,標籤為2。本教程中,資金流大小單的判斷條件基於成交股數,劃分了大單、中單、小單三種,判斷的邊界值是隨機定義的,開發者必須根據自己的實際場景進行調整。
  • 該函式將在響應式狀態引擎中使用,所以需要用 @state 表示函式是自定義的狀態函式。

2.3 根據股票和買單訂單號合併的增量計算

def processBuyOrderFunc(parallel){
  metricsBuy = [
    <TradeTime>,
    <SellNum>,
    <TradeAmount>,
    <TradeQty>,
    <cumsum(TradeAmount)>,
    <tagFunc(cumsum(TradeQty))>,
    <prev(cumsum(TradeAmount))>,
    <prev(tagFunc(cumsum(TradeQty)))>]
  for(i in 1..parallel){
    createReactiveStateEngine(name="processBuyOrder"+string(i), metrics=metricsBuy, dummyTable=tradeOriginalStream, outputTable=getStreamEngine("processSellOrder"+string(i)), keyColumn=`SecurityID`BuyNum, keepOrder=true)
    subscribeTable(tableName="tradeOriginalStream", actionName="processBuyOrder"+string(i), offset=-1, handler=getStreamEngine("processBuyOrder"+string(i)), msgAsTable=true, hash=i, filter=(parallel, i-1))
  }
}
  • ​parallel​ ​​引數是指流計算的並行度,上述程式碼中是把逐筆成交表​ ​tradeOriginalStream​ ​​中的資料對股票程式碼按照雜湊演算法,相對均勻地釋出到​ ​parallel​ ​個響應式狀態引擎1實現平行計算。這些響應式狀態引擎1的計算邏輯相同,但是處理的股票不同。
  • 上述程式碼中通過DolphinDB的響應式狀態引擎和內建的​ ​cumsum​ ​​,​ ​prev​ ​​函式實現流式增量計算,分組欄位為​ ​SecurityID​ ​​和​ ​BuyNum​ ​,即股票程式碼和 買單訂單號
  • ​metricsBuy​ ​中的內容為響應式狀態引擎中以元程式碼形式表示的計算公式:
metricsBuy = [
    <TradeTime>,
    <SellNum>,
    <TradeAmount>,
    <TradeQty>,
    <cumsum(TradeAmount)>,
    <tagFunc(cumsum(TradeQty))>,
    <prev(cumsum(TradeAmount))>,
    <prev(tagFunc(cumsum(TradeQty)))>]

​<TradeTime>​ ​​, ​ ​<SellNum>​ ​​, ​ ​<TradeAmount>​ ​​, ​ ​<TradeQty>​ ​是 無狀態 的計算,作用是保留原始表中這些欄位的原始資訊,輸入給下一層的響應式狀態引擎計算使用。​ ​<cumsum(TradeAmount)>​ ​​, ​ ​<tagFunc(cumsum(TradeQty))>​ ​​, ​ ​<prev(cumsum(TradeAmount))>​ ​​, ​ ​<prev(tagFunc(cumsum(TradeQty)))>​ ​是 有狀態 的計算,分別計算了每一條成交記錄所代表的股票按照此記錄的 買單訂單號 合併後的累計成交金額、當前成交記錄合入後根據累計成交量判斷的大小單標籤、當前成交記錄合入前的累計成交金額、當前成交記錄合入前根據累計成交量判斷的大小單標籤,作用是作為第三層響應式狀態引擎中的​ ​dynamicGroupCumsum​ ​​, ​ ​dynamicGroupCumcount​ ​函式的輸入,增量計算買方向的資金流指標。這些有狀態因子的計算都是通過 流式增量計算 的方法實現的。

為了方便開發者快速理解這塊程式碼的計算邏輯,下面我們輸入一些樣本資料來觀察第一層響應式狀態引擎的執行:

  • 逐筆成交表​ ​tradeOriginalStream​ ​中寫入5條資料

  • 經過第一層響應式狀態引擎的處理後,輸出為

上述程式碼對股票程式碼為​ ​60000​ ​的逐筆成交資料按照 買單訂單號​69792​ ​​進行合併計算,在響應式狀態引擎中對每一筆輸入都會進行一次響應計算,所以輸出結果的條數和輸入記錄的條數相等。結果表中的​ ​TotalBuyAmount​ ​​, ​ ​BuyOrderFlag​ ​​, ​ ​PrevTotalBuyAmount​ ​​, ​ ​PrevBuyOrderFlag​ ​分別代表每一條成交記錄所代表的股票按照此記錄的 買單訂單號 合併後的累計成交金額、當前成交記錄合入後根據累計成交量判斷的大小單標籤、當前成交記錄合入前的累計成交金額、當前成交記錄合入前根據累計成交量判斷的大小單標籤,這些有狀態因子的計算都是通過 流式增量計算 的方法實現的。

2.4 根據股票和賣單訂單號合併的增量計算

def processSellOrderFunc(parallel){
  colName = `SecurityID`BuyNum`TradeTime`SellNum`TradeAmount`TradeQty`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
  colType =  [SYMBOL, INT, TIMESTAMP, INT, DOUBLE, INT, DOUBLE, INT, DOUBLE, INT]
  processBuyOrder = table(1:0, colName, colType)
  metricsSell = [
    <TradeTime>,
    <TradeAmount>,
    <cumsum(TradeAmount)>,
    <tagFunc(cumsum(TradeQty))>,
    <prev(cumsum(TradeAmount))>,
    <prev(tagFunc(cumsum(TradeQty)))>,
    <BuyNum>,
    <TotalBuyAmount>,
    <BuyOrderFlag>,
    <PrevTotalBuyAmount>,
    <PrevBuyOrderFlag>]
  for(i in 1..parallel){
    createReactiveStateEngine(name="processSellOrder"+string(i), metrics=metricsSell, dummyTable=processBuyOrder, outputTable=getStreamEngine("processCapitalFlow"+string(i)), keyColumn=`SecurityID`SellNum, keepOrder=true)
  }
}
  • ​parallel​ ​​引數是指流計算的並行度,上述程式碼中是建立了​ ​parallel​ ​​個響應式狀態引擎2,這些響應式狀態引擎2的輸入是對應的​ ​parallel​ ​個響應式狀態引擎1的輸出,實現平行計算。這些響應式狀態引擎2的計算邏輯相同,但是處理的股票不同。
  • 上述程式碼中通過DolphinDB的響應式狀態引擎和內建的​ ​cumsum​ ​​,​ ​prev​ ​​函式實現流式增量計算,分組欄位為​ ​SecurityID​ ​​和​ ​SellNum​ ​,即股票程式碼和 賣單訂單號
  • ​metricsSell​ ​中的內容為響應式狀態引擎中以元程式碼形式表示的計算公式:
metricsSell = [
    <TradeTime>,
    <TradeAmount>,
    <cumsum(TradeAmount)>,
    <tagFunc(cumsum(TradeQty))>,
    <prev(cumsum(TradeAmount))>,
    <prev(tagFunc(cumsum(TradeQty)))>,
    <BuyNum>,
    <TotalBuyAmount>,
    <BuyOrderFlag>,
    <PrevTotalBuyAmount>,
    <PrevBuyOrderFlag>]

​<TradeTime>​ ​​, ​ ​<TradeAmount>​ ​​, ​ ​<BuyNum>​ ​​, ​ ​<TotalBuyAmount>​ ​​, ​ ​<BuyOrderFlag>​ ​​, ​ ​<PrevTotalBuyAmount>​ ​​, ​ ​<PrevBuyOrderFlag>​ ​是 無狀態 的計算,作用是保留原始表中這些欄位的原始資訊,輸入給下一層的響應式狀態引擎計算使用。​ ​<cumsum(TradeAmount)>​ ​​, ​ ​<tagFunc(cumsum(TradeQty))>​ ​​, ​ ​<prev(cumsum(TradeAmount))>​ ​​, ​ ​<prev(tagFunc(cumsum(TradeQty)))>​ ​是 有狀態 的計算,分別計算了每一條成交記錄所代表的股票按照此記錄的 賣單訂單號 合併後的累計成交金額、當前成交記錄合入後根據累計成交量判斷的大小單標籤、當前成交記錄合入前的累計成交金額、當前成交記錄合入前根據累計成交量判斷的大小單標籤,作用是作為第三層響應式狀態引擎中的​ ​dynamicGroupCumsum​ ​​, ​ ​dynamicGroupCumcount​ ​函式的輸入,增量計算賣方向的資金流指標。這些有狀態因子的計算都是通過 流式增量計算 的方法實現的。

為了方便開發者快速理解這塊程式碼的計算邏輯,下面我們輸入一些樣本資料來觀察第二層響應式狀態引擎的執行:

  • 第二層響應式狀態引擎的輸入為

  • 經過第二層響應式狀態引擎的處理後,輸出為

上述程式碼對股票程式碼為​ ​60000​ ​的逐筆成交資料按照 賣單訂單號​38446​ ​​, ​ ​70031​ ​​, ​ ​143303​ ​​, ​ ​155394​ ​​, ​ ​38433​ ​​進行合併計算,在響應式狀態引擎中對每一筆輸入都會進行一次響應計算,所以輸出結果的條數和輸入記錄的條數相等。結果表中的​ ​TotalSellAmount​ ​​, ​ ​SellOrderFlag​ ​​, ​ ​PrevTotalSellAmount​ ​​, ​ ​PrevSellOrderFlag​ ​分別代表每一條成交記錄所代表的股票按照此記錄的 賣單訂單號 合併後的累計成交金額、當前成交記錄合入後根據累計成交量判斷的大小單標籤、當前成交記錄合入前的累計成交金額、當前成交記錄合入前根據累計成交量判斷的大小單標籤,這些有狀態因子的計算都是通過 流式增量計算 的方法實現的。

2.5 根據股票合併的資金流指標的增量計算

def processCapitalFlowFunc(parallel){
  colName = `SecurityID`SellNum`TradeTime`TradeAmount`TotalSellAmount`SellOrderFlag`PrevTotalSellAmount`PrevSellOrderFlag`BuyNum`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
  colType =  [SYMBOL, INT, TIMESTAMP, DOUBLE, DOUBLE, INT, DOUBLE, INT,  INT, DOUBLE, INT, DOUBLE, INT]
  processSellOrder = table(1:0, colName, colType)
  metrics1 = <dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)> 
  metrics2 = <dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)> 
  metrics3 = <dynamicGroupCumsum(TotalBuyAmount, PrevTotalBuyAmount, BuyOrderFlag, PrevBuyOrderFlag, 3)> 
  metrics4 = <dynamicGroupCumcount(BuyOrderFlag, PrevBuyOrderFlag, 3)>
  for(i in 1..parallel){
    createReactiveStateEngine(name="processCapitalFlow"+string(i), metrics=[<TradeTime>, <cumsum(TradeAmount)>, metrics1, metrics2, metrics3, metrics4], dummyTable=processSellOrder, outputTable=capitalFlowStream, keyColumn=`SecurityID, keepOrder=true)
  }
}
  • ​parallel​ ​​引數是指流計算的並行度,上述程式碼中是建立了​ ​parallel​ ​​個響應式狀態引擎3,這些響應式狀態引擎3的輸入是對應的​ ​parallel​ ​個響應式狀態引擎2的輸出,實現平行計算。這些響應式狀態引擎3的計算邏輯相同,但是處理的股票不同。
  • 上述程式碼中通過DolphinDB的響應式狀態引擎和內建的​ ​cumsum​ ​​,​ ​dynamicGroupCumsum​ ​​,​ ​dynamicGroupCumcount​ ​​函式實現流式增量計算,分組欄位為​ ​SecurityID​ ​,即股票程式碼。
  • ​metrics​ ​中的內容為響應式狀態引擎中以元程式碼形式表示的計算公式:
metrics1 = <dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)> 
metrics2 = <dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)> 
metrics3 = <dynamicGroupCumsum(TotalBuyAmount, PrevTotalBuyAmount, BuyOrderFlag, PrevBuyOrderFlag, 3)> 
metrics4 = <dynamicGroupCumcount(BuyOrderFlag, PrevBuyOrderFlag, 3)>
metrics = [<TradeTime>, <cumsum(TradeAmount)>, metrics1, metrics2, metrics3, metrics4]

​<TradeTime>​ ​是 無狀態 的計算,作用是保留每一條計算結果的原始時間資訊。

​<cumsum(TradeAmount)>​ ​是 有狀態 的計算,表示從開盤到當前記錄,該只股票的總成交額。

​metrics1​ ​​中的​ ​<dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>​ ​是 有狀態 的計算, 輸入 是當前成交記錄所代表的股票按照此記錄的 賣單訂單號 合併後的累計成交金額、當前成交記錄合入前的累計成交金額、當前成交記錄合入後根據累計成交量判斷的大小單標籤、當前成交記錄合入前根據累計成交量判斷的大小單標籤、大小單標籤數量, 輸出 是表示從開盤到當前記錄,該只股票的賣方向小單的總成交額、賣方向中單的總成交額、賣方向大單的總成交額。​ ​metrics2​ ​​中的​ ​<dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>​ ​是 有狀態 的計算, 輸入 是當前成交記錄所代表的股票按照此記錄的 賣單訂單號 合併後根據累計成交量判斷的大小單標籤、當前成交記錄合入前根據累計成交量判斷的大小單標籤、大小單標籤數量, 輸出 是表示從開盤到當前記錄,該只股票的賣方向小單的總訂單數、賣方向中單的總訂單數、賣方向大單的總訂單數。

​metrics3​ ​​和​ ​metrics4​ ​也都是 有狀態 的計算,表示買方向的資金流指標,與賣方向的計算邏輯相似,不在展開闡述。這些有狀態因子的計算都是通過 流式增量計算 的方法實現的。

為了方便開發者快速理解這塊程式碼的計算邏輯,下面我們輸入一些樣本資料來觀察第三層響應式狀態引擎的執行:

  • 第三層響應式狀態引擎的輸入為

  • 經過第三層響應式狀態引擎的處理後,輸出為

上圖為股票程式碼為​ ​60000​ ​的日累計逐單資金流指標計算結果。

在響應式狀態引擎中對每一筆輸入都會進行一次響應計算,所以輸出結果的條數和輸入記錄的條數相等。結果表中的​ ​TotalAmount​ ​​表示從開盤到當前記錄,該只股票的總成交額,計算表示式是​ ​<cumsum(TradeAmount)>​ ​​,輸入是每一筆交易的成交額,是通過​ ​響應式狀態引擎​ ​​和​ ​cumsum​ ​累計求和函式實現 流式增量計算 的。

結果表中的​ ​SellSmallAmount​ ​​, ​ ​SellMediumAmount​ ​​, ​ ​SellBigAmount​ ​​表示從開盤到當前記錄,該只股票的賣方向小單的總成交額、賣方向中單的總成交額、賣方向大單的總成交額,計算表示式是​ ​<dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>​ ​ ,輸入是當前成交記錄所代表的股票按照此記錄的 賣單訂單號 合併後的累計成交金額、當前成交記錄合入前的累計成交金額、當前成交記錄合入後根據累計成交量判斷的大小單標籤、當前成交記錄合入前根據累計成交量判斷的大小單標籤和大小單標籤數量,是通過​ ​響應式狀態引擎​ ​​和​ ​dynamicGroupCumsum​ ​函式實現 流式增量計算 的,在日累計資金流實時計算場景中,隨著交易量的不斷增加,某個訂單的類別可能從一個小單變成大單,此時需要從小單累計統計量中減去該筆訂單已經累計的值,並在大單累計統計量中加上該筆訂單的最新累計值,​ ​dynamicGroupCumsum​ ​​函式即可應用在這類場景下。結果表中的​ ​SellSmallCount​ ​​, ​ ​SellMediumCount​ ​​, ​ ​SellBigCount​ ​​表示從開盤到當前記錄,該只股票的賣方向小單的總訂單數、賣方向中單的總訂單數、賣方向大單的總訂單數,計算表示式是​ ​<dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>​ ​ ,輸入是是當前成交記錄所代表的股票按照此記錄的 賣單訂單號 合併後根據累計成交量判斷的大小單標籤、當前成交記錄合入前根據累計成交量判斷的大小單標籤和大小單標籤數量,是通過​ ​響應式狀態引擎​ ​​和​ ​dynamicGroupCumcount​ ​函式實現 流式增量計算 的,在日累計資金流實時計算場景中,隨著交易量的不斷增加,某個訂單的類別可能從一個小單變成大單,此時需要從小單累計統計量中減1,並在大單累計統計量中加1,​ ​dynamicGroupCumcount​ ​函式即可應用在這類場景下。

結果表中的​ ​BuySmallAmount​ ​​, ​ ​BuyMediumAmount​ ​​, ​ ​BuyBigAmount​ ​​, ​ ​BuySmallCount​ ​​, ​ ​BuyMediumCount​ ​​, ​ ​BuyBigCount​ ​​表示買方向的日累計資金流指標,與賣方向的計算邏輯相似,不在展開闡述,也是通過​ ​響應式狀態引擎​ ​​和​ ​dynamicGroupCumsum​ ​​, ​ ​dynamicGroupCumcount​ ​函式實現 流式增量計算 的。

2.6 固定頻率往外推送計算結果

def processCapitalFlow60minFunc(){
  aggrMetrics = <[
    last(TotalAmount),
    last(SellSmallAmount),
    last(SellMediumAmount),
    last(SellBigAmount),
    last(SellSmallCount),
    last(SellMediumCount),
    last(SellBigCount),
    last(BuySmallAmount),
    last(BuyMediumAmount),
    last(BuyBigAmount),
    last(BuySmallCount),
    last(BuyMediumCount),
    last(BuyBigCount)]>
  createDailyTimeSeriesEngine(name="processCapitalFlow60min", windowSize=60000*60, step=60000*60, metrics=aggrMetrics, dummyTable=capitalFlowStream, outputTable=capitalFlowStream60min, timeColumn="TradeTime", useSystemTime=false, keyColumn=`SecurityID, useWindowStartTime=false)
  subscribeTable(tableName="capitalFlowStream", actionName="processCapitalFlow60min", offset=-1, handler=getStreamEngine("processCapitalFlow60min"), msgAsTable=true, batchSize=10000, throttle=1, hash=0)
}
  • 分組欄位為​ ​SecurityID​ ​,即股票程式碼。
  • 通過DolphinDB的時間序列聚合引擎對資金流指標結果表做實時的 60分鐘滾動視窗 計算, 聚合函式 為​ ​last​ ​。
  • 實時計算的資料來源為日累計資金流結果表capitalFlowStream,雖然該表的資料流量較大(和原始逐筆成交表的資料流量一樣大),但是由於是做簡單的60分鐘滾動指標計算,所以只需要單執行緒處理,不需要使用並行流處理。

2.7 註冊訂閱引擎和訂閱流資料表

parallel = 3
processCapitalFlowFunc(parallel)
go
processSellOrderFunc(parallel)
go
processBuyOrderFunc(parallel)
processCapitalFlow60minFunc()
  • ​parallel​ ​引數是指流計算的並行度。
  • 本教程設定​ ​parallel=3​ ​​,表示資金流計算的並行度為3,能夠支撐的上游逐筆交易資料的最大流量為5萬條每秒。2022年1月某日,滬深兩市全市場股票,在09:30:00開盤時候的逐筆交易資料流量峰值可以達到4.2萬筆每秒,所以生產環境部署的時候,為了避免因流量高峰時流處理堆積造成延時增加的現象,可以將​ ​parallel​ ​設定為3,提高系統實時計算的最大負載。

2.8 歷史資料回放

//replay history data
t = select * from loadTable("dfs://trade", "trade") where time(TradeTime) between 09:30:00.000 : 15:00:00.000 order by TradeTime, SecurityID
submitJob("replay_trade", "trade",  replay{t, tradeOriginalStream, `TradeTime, `TradeTime, 50000, true, 1})
getRecentJobs()

執行完後,返回如下資訊:

如果endTime和errorMsg為空,說明任務正在正常執行中。

3. 日累計資金流實時計算結果展示

3.1 節點內的計算結果表

計算結果表​ ​capitalFlowStream​ ​,可以通過DolphinDB所有API查詢介面實時查詢,通過DolphinDB GUI實時檢視該表的結果,返回:

3.2 固定頻率往外推送計算結果

計算結果表​ ​capitalFlowStream​ ​的資料量和逐筆成交資料量是一樣的,對每一筆成交記錄做一次響應。

如果開發者需要定時取一次截面上每一隻股票的最新日累計資金流指標,可以通過DolphinDB內建的時間序列計算引擎,在滾動視窗內取每一隻股票的最後一條計算結果即可。本教程中對計算結果表​ ​capitalFlowStream​ ​​進行了實時滾動視窗的計算,視窗大小是60分鐘,計算結果儲存在​ ​capitalFlowStream60min​ ​流資料表中,資料內容如下圖所示:

3.3 Grafana實時監控結果

4. 效能測試

本教程測試了單次響應計算和連續響應計算兩種場景。測試資料為上交所2020年某天1558只股票的逐筆成交資料。

4.1 單次響應計算效能測試

本教程使用了3個響應式狀態引擎串聯的流水線處理,單次響應計算時間為從第1個引擎收到輸入至第3個引擎輸出結果所經歷的時間,測試了單隻股票響應計算一次和1558只股票響應計算一次的效能。統計了10次的總耗時,取平均值作為單次的耗時。測試使用的伺服器CPU為Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz。單執行緒情況下,測試結果如下:

股票個數

耗時(單位:ms)

1

0.18

1558

2.09

4.2 連續響應計算效能測試

本教程使用了3個響應式狀態引擎串聯的流水線處理,計算的並行度為3,能夠支撐的上游逐筆交易資料的最大流量為5萬條每秒。以上交所2020年某天1558只股票的1632萬條逐筆成交資料為測試資料,通過DolphinDB的​ ​replay​ ​回放工具,把歷史資料以流資料的方式注入到流計算最上游的流資料表tradeOriginalStream,回放速度是全速,計算總耗時是326秒,處理的能力是5萬條每秒。開發者可以增加計算的並行度,提高系統的處理能力。

5. 總結

DolphinDB內建的流資料框架支援流資料的釋出,訂閱,預處理,實時記憶體計算,複雜指標的滾動視窗計算、滑動視窗計算、累計視窗計算等,是一個執行高效、使用便捷的流資料處理框架。

本教程基於DolphinDB流資料處理框架,提供了一種實時計算日累計逐單資金流的低延時解決方案,旨在提高開發人員在使用 DolphinDB 內建的流資料框架開發流計算業務場景時的開發效率、降低開發難度,更好地挖掘 DolphinDB 在複雜實時流計算場景中的價值。

附件

業務程式碼

​01.清理環境並建立相關流資料表​

​02.註冊流計算引擎和訂閱流資料表​

​03.歷史資料回放​

​04.流計算狀態監控函式​

開發環境

  • CPU 型別:Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
  • 邏輯 CPU 總數:8
  • 記憶體:64GB
  • OS:64位 CentOS Linux 7 (Core)
  • 磁碟:SSD 盤,最大讀寫速率為 520MB/s
  • server 版本:1.30.18,2.00.6