更強大、更靈活、更全面丨一文搞懂DolphinDB視窗計算

語言: CN / TW / HK

在時序資料的處理中經常需要使用視窗計算。在DolphinDB中,視窗計算不僅僅應用於全量的歷史資料計算,還可以應用於增量的流計算。視窗函式既可應用於SQL(處理表中的列),也可應用於面板資料(處理矩陣中的列)。DolphinDB對於視窗計算進行了精心優化,與其它系統相比,擁有顯著的效能優勢。除此之外,DolphinDB的視窗函式使用上更加靈活,不僅內建的或自定義的vector函式都可用於視窗計算,而且可以多個函式巢狀使用。

本篇將系統的介紹DolphinDB的視窗計算,從概念劃分、應用場景、指標計算等角度,幫助使用者快速掌握和運用DolphinDB強大的視窗計算功能。

本篇所有程式碼支援DolphinDB 1.30.15,2.00.3及以上版本。

1.30.7,2.00.0以上版本支援絕大部分程式碼,細節部分會在小節內部詳細說明。

1. 視窗的概念及分類

DolphinDB內有四種視窗,分別是:滾動視窗、滑動視窗、累計視窗和不定長視窗(包括會話視窗和segment window)。

在DolphinDB中,視窗的度量標準有兩種:資料行數和時間。 為了方便理解,可以參考下圖:

本章節將介紹各個視窗型別的概念,具體的應用例項會在第2-4章節詳細介紹。

1.1 滾動視窗

滾動視窗將每個資料分配到一個指定大小的視窗中。通常滾動視窗大小固定,且相鄰兩個視窗沒有重複的元素。

滾動視窗根據度量標準的不同可以分為2種:

  • 以行數劃分視窗

圖1-1-1 指定視窗大小為3行記錄,橫座標以時間為單位。從圖上可以看出,按每三行記錄劃分為了一個視窗,視窗之間的元素沒有重疊。

  • 以時間劃分視窗

圖1-1-2 指定視窗大小為3個時間單位,橫座標以時間為單位。與以行數劃分視窗不同,按時間劃分的視窗內記錄數是不固定的。從兩個圖中可以看出,同一資料按不同單位型別劃分視窗的結果是不同的。

1.2 滑動視窗

滑動視窗的模式是:一定長度的視窗,根據步長,進行滑動。與滾動視窗不同,滑動視窗相鄰兩個視窗可能包括重複的元素。滑動視窗根據步長和視窗採用不同的度量標準,可以分成以下4種:

  • 步長為1行, 視窗為n行

圖1-2-1 指定視窗大小為6行記錄,視窗每次向後滑動1行記錄。

  • 步長為1行,視窗為指定時間

圖1-2-2 指定視窗大小為3個時間單位,視窗以右邊界為基準進行前向計算,視窗每次向後滑動1行記錄。

  • 步長為時間,視窗為n個步長時間

圖1-2-3 指定視窗大小為4個時間單位,每次向後滑動2個時間單位。

  • 步長為n行,視窗為m行

圖1-2-4 指定視窗大小為5行記錄,視窗每次向後滑動3行記錄。

1.3 累計視窗

累計視窗,即視窗的起始邊界固定,結束邊界累計右移。根據資料的增加或時間的增長,視窗的大小會變大。 累計視窗根據度量標準的不同可以分為2種:

  • 步長為指定時間單位

圖1-3-1 視窗右邊界每次右移2個時間單位,視窗大小累計增加。

  • 步長為1行

圖1-3-2 視窗右邊界每次右移1行,視窗大小累計增加。

1.4 不定長視窗

1.4.1 會話視窗

會話視窗是根據指定時間長度(session gap)切分視窗:若某條資料之後指定時間長度內無資料進入,則該條資料為一個視窗的終點,之後第一條新資料為另一個視窗的起點。 會話視窗的視窗大小可變,視窗的度量方式為時間。

1.4.2 segment視窗

segment視窗是根據給定的資料來切分視窗,連續的相同元素為一個視窗。視窗大小可變,視窗的度量方式為行。

2. SQL中的視窗計算以及視窗連線計算

SQL中的視窗計算一般涉及滾動視窗,滑動視窗,累計視窗以及segment視窗。DolphinDB中也有涉及視窗計算的window join視窗連線。本章將對上述幾個視窗計算一一介紹。

2.1 SQL中的視窗計算

2.1.1 滾動視窗

2.1.1.1 時間維度的滾動視窗

在SQL中,可使用interval,bar,dailyAlignedBar等函式配合group by語句實現滾動視窗的聚合計算。

bar函式為例,下面的例子是將10:00:00到10:05:59每秒更新的資料,每2分鐘統計一次交易量之和:

t=table(2021.11.01T10:00:00..2021.11.01T10:05:59 as time, 1..360 as volume)
select sum(volume) from t group by bar(time, 2m)

# output

bar_time            sum_volume
------------------- ----------
2021.11.01T10:00:00 7260      
2021.11.01T10:02:00 21660     
2021.11.01T10:04:00 36060 

bar函式的分組規則是根據每條記錄最近的能整除duration引數的時間作為開始時間的,但是對於一些開始時間不能直接被整除的場景,bar函式不適用。 在金融場景中,往往在交易時段之外也有一些資料輸入,但是在做資料分析的時候並不會用到這些資料;在期貨市場,通常涉及到兩個時間段,有些交易時段會隔天。dailyAlignedBar函式可以設定每天的起始時間和結束時間,很好地解決這類場景的聚合計算問題。

以期貨市場為例,資料模擬為國內期貨市場兩天的兩個交易時段下午1:30-3:00和晚上9:00-凌晨2:30。使用dailyAlignedBar函式計算每個交易時段中的7分鐘均價。

sessions = 13:30:00 21:00:00
ts = 2021.11.01T13:30:00..2021.11.01T15:00:00 join 2021.11.01T21:00:00..2021.11.02T02:30:00
ts = ts join (ts+60*60*24)
t = table(ts, rand(10.0, size(ts)) as price)

select avg(price) as price, count(*) as count from t group by dailyAlignedBar(ts, sessions, 7m) as k7

 # output
 
k7                  price             count
------------------- ----------------- -----
2021.11.01T13:30:00 4.815287529108381 420  
2021.11.01T13:37:00 5.265409774828835 420  
2021.11.01T13:44:00 4.984934388122167 420  
...
2021.11.01T14:47:00 5.031795592230213 420  
2021.11.01T14:54:00 5.201864532018313 361  
2021.11.01T21:00:00 4.945093814017518 420 


//如果使用bar函式會不達預期
select avg(price) as price, count(*) as count from t group by bar(ts, 7m) as k7

 # output

k7                  price             count
------------------- ----------------- -----
2021.11.01T13:26:00 5.220721067537347 180       //時間從13:26:00開始,不符合預期
2021.11.01T13:33:00 4.836406542137931 420  
2021.11.01T13:40:00 5.100716347573325 420  
2021.11.01T13:47:00 5.041169475132067 420  
2021.11.01T13:54:00 4.853431270784876 420  
2021.11.01T14:01:00 4.826169502311608 420  

interval函式的主要應用是插值。如期貨市場中有一些不活躍的期貨,一段時間內可能都沒有報價,但是在資料分析的時候需要每2秒都需要輸出該期貨的資料,缺失的資料根據前面的值進行插值;如果這2秒內有重複的值,則用最後一個作為輸出值。這個場景下就需要用到interval函式。

t=table(2021.01.01T01:00:00+(1..5 join 9..11) as time, take(`CLF1,8) as contract, 50..57 as price)

select last(contract) as contract, last(price) as price from t group by interval(time, 2s,"prev") 

 # output

interval_time       contract price
------------------- -------- -----
2021.01.01T01:00:00 CLF1     50   
2021.01.01T01:00:02 CLF1     52   
2021.01.01T01:00:04 CLF1     54   
2021.01.01T01:00:06 CLF1     54   
2021.01.01T01:00:08 CLF1     55   
2021.01.01T01:00:10 CLF1     57   

//如果使用bar函式會不達預期

select last(contract) as contract, last(price) as price from t group by bar(time, 2s)

bar_time            contract price
------------------- -------- -----
2021.01.01T01:00:00 CLF1     50   
2021.01.01T01:00:02 CLF1     52   
2021.01.01T01:00:04 CLF1     54   
2021.01.01T01:00:08 CLF1     55   
2021.01.01T01:00:10 CLF1     57       

2.1.1.2 記錄數維度的滾動視窗

除了時間維度可以做滾動視窗計算之外,記錄數維度也可以做滾動視窗計算。 在股票市場臨近收盤的時候,往往一分鐘之內的交易量、筆數是非常大的,做策略時如果單從時間維度去觸發可能會導致偏差。因此分析師有時會想要從每100筆交易而非每一分鐘的角度去做策略,這個時候就可以用rolling函式實現。

下面是某天股票市場最後一分鐘內對每100筆交易做成交量之和的例子:

t=table(2021.01.05T02:59:00.000+(1..2000)*100 as time, take(`CL,2000) as sym, 10* rand(50, 2000) as vol)

select rolling(last,time,100) as last_time,rolling(last,t.sym,100) as sym, rolling(sum,vol,100) as vol_100_sum from t 

 # output (每次結果會因為rand函式結果而不同)

last_time               sym vol_100_sum
----------------------- --- -----------
2021.01.05T02:59:00.100 CL  26480      
2021.01.05T02:59:00.200 CL  25250      
2021.01.05T02:59:00.300 CL  25910      
2021.01.05T02:59:00.400 CL  22890      
2021.01.05T02:59:00.500 CL  24000      
...   

2.1.2 滑動視窗

滑動視窗計算涉及步長與視窗長度兩個維度:

  • 步長是指計算如何觸發:每隔一定數量的行或者每隔一定時間長度;
  • 視窗長度是指每次計算時包含的資料量:一定數量的行的資料或者一定時間長度的資料。

2.1.2.1 步長為1行,視窗長度為n行

此類情況可使用m系列函式,moving函式,或者rolling。下面以msum為例,滑動計算視窗長度為5行的vol值之和。

t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 as time, 1..10 as vol)

select time, vol, msum(vol,5,1) from t

 # output

time                vol msum_vol
------------------- --- --------
2021.11.01T10:00:00 1   1       
2021.11.01T10:00:01 2   3       
2021.11.01T10:00:02 3   6       
2021.11.01T10:00:05 4   10      
2021.11.01T10:00:06 5   15    
...

DolphinDB SQL可以通過context by對各個不同的symbol在組內進行視窗計算。context by是DolphinDB獨有的功能,是對標準SQL語句的拓展,具體其他用法參照:context by

 t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 join 0 1 2 5 6 9 10 17 18 30 as time, 1..20 as vol, take(`A,10) join take(`B,10) as sym)

select time, sym, vol, msum(vol,5,1) from t context by sym

 # output

time                sym vol msum_vol
------------------- --- --- --------
2021.11.01T10:00:00 A   1   1       
2021.11.01T10:00:01 A   2   3       
2021.11.01T10:00:02 A   3   6       
...    
2021.11.01T10:00:30 A   10  40      
2021.11.01T10:00:00 B   11  11      
2021.11.01T10:00:01 B   12  23      
...    
2021.11.01T10:00:30 B   20  90 

m系列函式是經過優化的視窗函式,如果想要使用自定義函式做視窗計算,DolphinDB支援在moving函式和rolling函式中使用自定義聚合函式。下面以moving巢狀自定義聚合函式為例: 以下的行情資料有四列(程式碼,日期,close和volume),按照程式碼分組,組內按日期排序。設定視窗大小為20,在視窗期內按照volume排序,取volume最大的五條資料的平均close的計算。

//t是模擬的四列資料
t = table(take(`IBM, 100) as code, 2020.01.01 + 1..100 as date, rand(100,100) + 20 as volume, rand(10,100) + 100.0 as close)

//1.30.15及以上版本可以用一行程式碼實現
//moving支援使用者使用自定義匿名聚合函式(https://www.dolphindb.cn/cn/help/130/Functionalprogramming/AnonymousFunction.html)
select code, date, moving(defg(vol, close){return close[isort(vol, false).subarray(0:min(5,close.size()))].avg()}, (volume, close), 20) from t context by code 

//其他版本可以用自定義命名聚合函式實現:
defg top_5_close(vol,close){
return close[isort(vol, false).subarray(0:min(5,close.size()))].avg()
}
select code, date, moving(top_5_close,(volume, close), 20) from t context by code 

在做資料分析的時候,還會經常用到視窗巢狀視窗的操作。 舉一個更復雜的例子:在做101 Formulaic Alphas中98號因子計算的時候,DolphinDB可以運用視窗巢狀視窗的方法,將原本在C#中需要幾百行的程式碼,簡化成幾行程式碼,且計算效能也有接近三個數量級的提升。 trade表有需要可以自行模擬資料,或用sample資料CNTRADE

// 輸入表trade的schema如下,如需要可自行模擬資料。

name       typeString typeInt 
---------- ---------- ------- 
ts_code    SYMBOL     17             
trade_date DATE       6              
open       DOUBLE     16             
vol        DOUBLE     16             
amount     DOUBLE     16    

// alpha 98 計算:

def normRank(x){
	return rank(x)\x.size()
}

def alpha98SQL(t){
	update t set adv5 = mavg(vol, 5), adv15 = mavg(vol, 15) context by ts_code
	update t set rank_open = normRank(open), rank_adv15 = normRank(adv15) context by trade_date
	update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by ts_code
	return select ts_code, trade_date, normRank(decay7)-normRank(decay8) as a98 from t context by trade_date 
}

input = select trade_date,ts_code,amount*1000/(vol*100 + 1) as vwap,vol,open from trade
timer alpha98DDBSql = alpha98SQL(input)

2.1.2.2 步長為1行,視窗為指定時間長度

此類情況可使用m系列或者tm系列函式。下面以tmsum為例,計算滑動視窗長度為5秒的vol值之和。

//1.30.14,2.00.2以上版本支援```tmsum```函式。
t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 as time, 1..10 as vol)
select time, vol, tmsum(time,vol,5s) from t

 # output
time                vol tmsum_time
------------------- --- ----------
2021.11.01T10:00:00 1   1         
2021.11.01T10:00:01 2   3         
2021.11.01T10:00:02 3   6         
2021.11.01T10:00:05 4   9         
2021.11.01T10:00:06 5   12        
2021.11.01T10:00:09 6   15        
2021.11.01T10:00:10 7   18        
2021.11.01T10:00:17 8   8         
2021.11.01T10:00:18 9   17        
2021.11.01T10:00:30 10  10  

實際場景中,計算曆史分位的時候也會廣泛運用到這類情況的視窗計算,具體在3.1.1介紹。

2.1.2.3 步長為時間長度,視窗為n個步長時間

此類情況可使用interval函式配合group by語句。下面的例子以5秒為視窗步長,10秒為視窗大小,計算vol值之和。

推薦使用1.30.14, 2.00.2及以上版本使用interval函式。

t=table(2021.11.01T10:00:00+0 3 5 6 7 8 15 18 20 29 as time, 1..10 as vol)
select sum(vol) from t group by interval(time, 10s, "null", 5s)

 # output

interval_time       sum_vol
------------------- -------
2021.11.01T10:00:00 21     
2021.11.01T10:00:05 18     
2021.11.01T10:00:10 15       
2021.11.01T10:00:15 24     
2021.11.01T10:00:20 19     
2021.11.01T10:00:25 10  

2.1.1.1中interval的場景可以看作是視窗大小與步長相等的特殊的滑動視窗,而本節則是視窗大小為n倍步長時間的滑動視窗。interval函式為資料分析提供了更便捷的工具。

2.1.2.4 步長為n行,視窗為k*n行

此類情況可使用高階函式rolling。下面的例子計算步長為3行,視窗長度為6行的vol值之和。與interval函式不同的是,rolling不會對缺失值進行插值,如果視窗內的元素個數不足視窗大小,該視窗不會被輸出。 該例子中,資料一共是10條,在前兩個視窗計算完之後,第三個視窗因為只有4條資料,所以不輸出第三個視窗的結果。

t=table(2021.11.01T10:00:00+0 3 5 6 7 8 15 18 20 29 as time, 1..10 as vol)
select rolling(last,time,6,3) as last_time, rolling(sum,vol,6,3) as sum_vol from t

 # output

last_time           sum_vol
------------------- -------
2021.11.01T10:00:08 21     
2021.11.01T10:00:20 39

2.1.3 累計視窗

累計視窗有兩種情況:一種是步長是1行,另一種是步長為指定時間長度。

2.1.3.1 步長為1行

步長為1行的累計視窗計算在SQL中通常直接用cum系列函式。下面的是累計求和cumsum的例子:

t=table(2021.11.01T10:00:00..2021.11.01T10:00:04 join 2021.11.01T10:00:06..2021.11.01T10:00:10 as time,1..10 as vol)
select *, cumsum(vol) from t 

# output

time                vol cum_vol
------------------- --- -------
2021.11.01T10:00:00 1   1      
2021.11.01T10:00:01 2   3      
2021.11.01T10:00:02 3   6      
2021.11.01T10:00:03 4   10     
2021.11.01T10:00:04 5   15     
2021.11.01T10:00:06 6   21     
2021.11.01T10:00:07 7   28     
2021.11.01T10:00:08 8   36     
2021.11.01T10:00:09 9   45     
2021.11.01T10:00:10 10  55  

在實際場景中經常會用cum系列函式與context by連用,做分組內累計計算。比如行情資料中,根據各個不同股票的程式碼,做各自的累計成交量。

t=table(2021.11.01T10:00:00 + 0 1 2 5 6 9 10 17 18 30 join 0 1 2 5 6 9 10 17 18 30 as time, 1..20 as vol, take(`A,10) join take(`B,10) as sym)
select*, cumsum(vol) as cumsum_vol from t context by sym

# output

time                vol sym cumsum_vol
------------------- --- --- ----------
2021.11.01T10:00:00 1   A   1         
2021.11.01T10:00:01 2   A   3         
...      
2021.11.01T10:00:18 9   A   45        
2021.11.01T10:00:30 10  A   55        
2021.11.01T10:00:00 11  B   11        
2021.11.01T10:00:01 12  B   23        
...      
2021.11.01T10:00:18 19  B   135       
2021.11.01T10:00:30 20  B   155    

2.1.3.2 步長為指定時間長度

要在SQL中實現步長為指定時間長度的累計視窗計算,可以使用bar類函式搭配cgroup來實現。

t=table(2021.11.01T10:00:00..2021.11.01T10:00:04 join 2021.11.01T10:00:06..2021.11.01T10:00:10 as time,1..10 as vol)
select sum(vol) from t cgroup by bar(time, 5s) as time order by time

# output

time                sum_vol
------------------- -------
2021.11.01T10:00:00 15     
2021.11.01T10:00:05 45     
2021.11.01T10:00:10 55  

2.1.4 segment視窗

以上所有例子中,視窗大小均固定。在DolphinDB中亦可將連續的相同元素做為一個視窗,用segment來實現。下面的例子是根據order_type中的資料進行視窗分割,進行累計求和計算。 實際場景中,segment經常用於逐筆資料中,連續相同的order_type做累計成交額。

    vol = 0.1 0.2 0.1 0.2 0.1 0.2 0.1 0.2 0.1 0.2 0.1 0.2
order_type = 0 0 1 1 1 2 2 1 1 3 3 2;
t = table(vol,order_type);
select *, cumsum(vol) as cumsum_vol from t context by segment(order_type);

# output

vol order_type cumsum_vol
--- ---------- ----------
0.1 0          0.1       
0.2 0          0.3       
0.1 1          0.1       
0.2 1          0.3       
0.1 1          0.4       
0.2 2          0.2       
0.1 2          0.3       
0.2 1          0.2       
0.1 1          0.3       
0.2 3          0.2       
0.1 3          0.3       
0.2 2          0.2       

2.2 SQL中的視窗連線計算

在DolphinDB中,除了常規的視窗計算之外,還支援視窗連線計算。即在表連線的同時,進行視窗計算。這裡用到的函式有wjpwj

window join在表連線的同時對右表進行步長為1行,視窗為時間長度的視窗計算。因為視窗的左右邊界均可以指定,也可以為負數,所以也可以看作非常靈活的滑動視窗。詳細用法參見使用者手冊window join

   //data
t1 = table(1 1 2 as sym, 09:56:06 09:56:07 09:56:06 as time, 10.6 10.7 20.6 as price)
t2 = table(take(1,10) join take(2,10) as sym, take(09:56:00+1..10,20) as time, (10+(1..10)\10-0.05) join (20+(1..10)\10-0.05) as bid, (10+(1..10)\10+0.05) join (20+(1..10)\10+0.05) as offer, take(100 300 800 200 600, 20) as volume);

//window join calculation
wj(t1, t2, -5s:0s, <avg(bid)>, `sym`time);

# output

sym time     price  avg_bid           
--- -------- ----- -------
1   09:56:06 10.6 10.3
1   09:56:07 10.7 10.4
2   09:56:06 20.6 20.3      

由於視窗可以靈活設定,所以不僅是多表連線的時候會用到,單表內部的視窗計算也可以用到window join。下面的例子可以看作是t2表中每一條資料做一個(time-6s)到(time+1s)的計算。

t2 = table(take(1,10) join take(2,10) as sym, take(09:56:00+1..10,20) as time, (10+(1..10)\10-0.05) join (20+(1..10)\10-0.05) as bid, (10+(1..10)\10+0.05) join (20+(1..10)\10+0.05) as offer, take(100 300 800 200 600, 20) as volume);

wj(t2, t2, -6s:1s, <avg(bid)>, `sym`time);

# output

sym time     bid   offer volume avg_bid           
--- -------- ---- ------ ------ --------
1   09:56:01 10.05 10.15 100    10.1
...  
1   09:56:08 10.75 10.85 800    10.5              
1   09:56:09 10.85 10.95 200    10.6
1   09:56:10 10.95 11.05 600    10.65             
2   09:56:01 20.05 20.15 100    20.1
2   09:56:02 20.15 20.25 300    20.15
...
2   09:56:08 20.75 20.85 800    20.5              
2   09:56:09 20.85 20.9  200    20.6
2   09:56:10 20.95 21.05 600    20.65

3.面板資料使用視窗計算

在DolphinDB中,面板資料可以是矩陣也可以是表。表的視窗計算在前一章節已經描述,所以在這一章節中著重討論矩陣的計算。

3.1 面板資料的滑動視窗計算

滑動視窗m系列函式也可以適用於面板資料,即在矩陣每列內進行計算,返回一個與輸入矩陣維度相同的矩陣。如果滑動維度為時間,則要先使用setIndexedMatrix!函式將矩陣的行與列標籤設為索引。這裡需要注意的是,行與列標籤均須嚴格遞增。在矩陣計算中,IndexedMatrix可以幫助對齊行與列的不同標籤,非常實用。通常我們會使用pivot by語句配合exec或者panel函式將豎錶轉化為寬表(矩陣),因為這個操作會將矩陣的行與列按遞增方式排列,方便我們設定索引矩陣以及後期的計算。

首先我們新建一個矩陣,並將其設為IndexedMatrix:

m=matrix(1..4 join 6, 11..13 join 8..9)
m.rename!(2020.01.01..2020.01.04 join 2020.01.06,`A`B)
m.setIndexedMatrix!();

面板資料的滑動視窗大小支援兩種度量方式:記錄數和時間。

3.1.1 步長為1行,視窗為n行

m系列函式的引數可以是一個正整數(記錄數維度)或一個 duration(時間維度)。通過設定不同的引數,可以指定理想的滑動視窗型別。

msum滑動求和為例。以下例子是對一個矩陣內部,每一列做視窗大小為3行的滑動求和。

msum(m,3,1)

# output

           A  B 
           -- --
2020.01.01|1  11
2020.01.02|3  23
2020.01.03|6  36
2020.01.04|9  33
2020.01.06|13 30

矩陣運算中,也可以做複雜的視窗巢狀。曾在2.1.2.1節中提到的98號因子也可以在矩陣中通過幾行程式碼實現(trade表有需要可以自行模擬資料,或用sample資料CNTRADE):

// 輸入表trade的schema如下,如需要可自行模擬資料:

name       typeString typeInt 
---------- ---------- ------- 
ts_code    SYMBOL     17             
trade_date DATE       6              
open       DOUBLE     16             
vol        DOUBLE     16             
amount     DOUBLE     16   

// alpha 98 的矩陣計算

def prepareDataForDDBPanel(){
	t = select trade_date,ts_code,amount*1000/(vol*100 + 1) as vwap,vol,open from trade 
	return dict(`vwap`open`vol, panel(t.trade_date, t.ts_code, [t.vwap, t.open, t.vol]))
}

def myrank(x) {
	return rowRank(x)\x.columns()
}

def alpha98Panel(vwap, open, vol){
	return myrank(mavg(mcorr(vwap, msum(mavg(vol, 5), 26), 5), 1..7)) - myrank(mavg(mrank(9 - mimin(mcorr(myrank(open), myrank(mavg(vol, 15)), 21), 9), true, 7), 1..8))
}

input = prepareDataForDDBPanel()
alpha98DDBPanel = alpha98Panel(input.vwap, input.open, input.vol)

3.1.2 步長為1行,視窗為指定時間

msum滑動求和為例。以下例子是對一個矩陣內部,每一列根據左邊的時間列做視窗大小為3天的滑動求和。

msum(m,3d)

# output

           A  B 
           -- --
2020.01.01|1  11
2020.01.02|3  23
2020.01.03|6  36
2020.01.04|9  33
2020.01.06|10 17

在實際運用中,這類矩陣視窗運算是非常常見的。比如在做歷史分位的計算中,將資料轉化為IndexedMatrix之後,直接用一行程式碼就可以得到結果了。

下面例子是對m矩陣做10年的歷史分位計算:

//推薦使用1.30.14, 2.00.2及以上版本來使用interval函式。
mrank(m, true, 10y, percent=true)

# output
           A B   
           - ----
2020.01.01|1 1   
2020.01.02|1 1   
2020.01.03|1 1   
2020.01.04|1 0.25
2020.01.06|1 0.4 

3.2 面板資料的累計視窗計算

在面板資料中,累計函式cum系列也可以直接使用。 以cumsum為例:

cumsum(m)

 # output 

            A  B 
           -- --
2020.01.01|1  11
2020.01.02|3  23
2020.01.03|6  36
2020.01.04|10 44
2020.01.06|16 53

4.流式資料的視窗計算

在DolphindDB中,設計了許多內建的流計算引擎。各類引擎都有不同的用法,有些支援聚合計算,有些則支援滑動視窗或者累計視窗計算,在此基礎上也有針對於流資料的會話視窗引擎。可以滿足不同的場景需求。下面根據不同視窗以及引擎分別介紹。

4.1 滾動視窗在流計算中的應用

實際場景中,滾動視窗計算在流資料中應用得最為廣泛。比如5分鐘k線,1分鐘累計交易量等等的應用,都需要用到滾動視窗計算。滾動視窗在流計算中的應用是通過各種時間序列引擎實現的。

createTimeSeriesEngine時間序列引擎應用的很廣泛,與他類似的引擎還有createDailyTimeSeriesEnginecreateSessionWindowEnginecreateDailyTimeSeriesEnginedailyAlignedBar類似,可以指定自然日之內的時間段進行視窗計算,而非按照流入資料的時間視窗聚合計算。createSessionWindowEngine會在4.3中詳細介紹。 本節以createTimeSeriesEngine為例。下例中,時間序列引擎timeSeries1訂閱流資料表trades,實時計算表trades中過去1分鐘內每隻股票交易量之和。

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
timeSeries1 = createTimeSeriesEngine(name="timeSeries1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="timeSeries1", offset=0, handler=append!{timeSeries1}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

sleep(10)

select * from output1;

 # output

time                    sym sumVolume
----------------------- --- ---------
2018.10.08T01:02:00.000 A   38       
2018.10.08T01:02:00.000 B   40       
2018.10.08T01:03:00.000 A   25       
2018.10.08T01:03:00.000 B   9       


//to drop the time series engine
dropStreamEngine(`timeSeries1)
unsubscribeTable(tableName="trades", actionName="timeSeries1")
undef("trades",SHARED)

4.2 滑動、累計視窗在流計算中的應用

另一個常用的引擎是響應式狀態引擎createReactiveStateEngine。在這個引擎中,我們可以使用經過優化的狀態函式,其中包括累計視窗函式(cum系列函式)和滑動視窗函式(m系列函式以及tm系列函式)。

createReactiveStateEngine響應式狀態引擎的功能非常強大,可以讓流資料像SQL一樣處理,實現批流一體。下面的例子同時展示了cum系列函式,m系列函式和tm系列函式在createReactiveStateEngine響應式狀態引擎中的作用。

//1.30.14,2.00.2以上版本支援tmsum函式。
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output2 = table(10000:0, `sym`time`Volume`msumVolume`cumsumVolume`tmsumVolume, [ SYMBOL,TIMESTAMP,INT, INT,INT,INT])
reactiveState1= createReactiveStateEngine(name="reactiveState1", metrics=[<time>,<Volume>,<msum(volume,2,1)>,<cumsum(volume)>,<tmsum(time,volume,2m)>], dummyTable=trades, outputTable=output2, keyColumn="sym")
subscribeTable(tableName="trades", actionName="reactiveState1", offset=0, handler=append!{reactiveState1}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

sleep(10)

select * from output2

 # output

sym time                    Volume msumVolume cumsumVolume tmsumVolume
--- ----------------------- ------ ---------- ------------ -----------
A   2018.10.08T01:01:01.785 10     10         10           10         
B   2018.10.08T01:01:02.125 26     26         26           26         
A   2018.10.08T01:01:12.457 28     38         38           38         
B   2018.10.08T01:01:10.263 14     40         40           40         
A   2018.10.08T01:02:10.789 15     43         53           53         
B   2018.10.08T01:02:12.005 9      23         49           49         
A   2018.10.08T01:02:30.021 10     25         63           63         
A   2018.10.08T01:04:02.236 29     39         92           54         
B   2018.10.08T01:04:04.412 32     41         81           41         
B   2018.10.08T01:04:05.152 23     55         104          64           

//to drop the reactive state engine

dropAggregator(`reactiveState1)
unsubscribeTable(tableName="trades", actionName="reactiveState1")
undef("trades",SHARED)

4.3 會話視窗引擎

createSessionWindowEngine可以根據間隔時間(session gap)切分不同的視窗,即當一個視窗在大於session gap的時間內沒有接收到新資料時i,視窗會關閉。所以這個引擎中的window size是會根據流入資料的情況發生變化的。

具體可以看以下例子:

share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
output1 = keyedTable(`time,10000:0, `time`sumVolume, [TIMESTAMP, INT])
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
volumev = (1..n)%1000
insert into trades values(timev, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
insert into trades values(timev, volumev)

n = 3
timev = 2018.10.12T10:01:00.020 + (1..n)
volumev = (1..n)%1000
timev.append!(2018.10.12T10:01:00.027 + (1..n))
volumev.append!((1..n)%1000)
insert into trades values(timev, volumev)

select * from trades;

//傳入資料如下:

 time                    volume
----------------------- ------
2018.10.12T10:01:00.001 1     
2018.10.12T10:01:00.002 2     
2018.10.12T10:01:00.003 3     
2018.10.12T10:01:00.004 4     
2018.10.12T10:01:00.005 5     
2018.10.12T10:01:00.011 1     
2018.10.12T10:01:00.012 2     
2018.10.12T10:01:00.013 3     
2018.10.12T10:01:00.014 4     
2018.10.12T10:01:00.015 5     
2018.10.12T10:01:00.021 1     
2018.10.12T10:01:00.022 2     
2018.10.12T10:01:00.023 3     
2018.10.12T10:01:00.028 1     
2018.10.12T10:01:00.029 2     
2018.10.12T10:01:00.030 3    


//經過createSessionWindowEngine會話視窗引擎後,根據session gap=5(ms)聚合形成的視窗計算結果為:
select * from output1

time                    sumVolume
----------------------- ---------
2018.10.12T10:01:00.001 15       
2018.10.12T10:01:00.011 15       
2018.10.12T10:01:00.021 6    

// to drop SessionWindowEngine

unsubscribeTable(tableName="trades", actionName="append_engine_sw")
dropAggregator(`engine_sw)
undef("trades",SHARED)

5.視窗計算的空值處理規則

在DolphinDB中,各個視窗函式的空值處理略有不同,此處分別講述一下各個系列函式空值處理的規則:

5.1 moving,m系列函式,tm系列函式以及cum系列函式的空值處理

對於除了rank的m系列,tm系列以及cum系列視窗內的NULL值,與其聚合函式處理NULL值的規則一致,計算時忽略NULL值。 在mranktmrank以及cumrank函式中,可以指定NULL是否參與計算。

大部分moving以及m系列函式引數裡都有一個可選引數 minPeriods。若沒有指定 minPeriods,結果的前(window - 1)個元素為NULL;若指定了 minPeriods,結果的前( minPeriods - 1)個元素為NULL。如果視窗中的值全為NULL,該視窗的計算結果為NULL。minPeriods的預設值為window之值。

一個簡單的例子:

m=matrix(1..5, 6 7 8 NULL 10)

//不指定minPeriods時,由於minPeriods預設值與window相等,所以結果的前二行均為NULL。

msum(m,3)

 #0 #1
-- --
     
     
6  21
9  15
12 18

//若指定minPeriods=1,結果的前二行不是NULL值。

 msum(m,3,1)

 #0 #1
-- --
1  6 
3  13
6  21
9  15
12 18

5.2 rolling的空值處理

moving函式不同的是,rolling函式不輸出前(window - 1)個元素的NULL值結果。可以通過下面的例子來感受:

t是一個包含NULL值的表,我們分別用rollingmoving對vol這一列做視窗為3行的視窗求和計算。

vol=1 2 3 4 NULL NULL NULL 6 7 8
t= table(vol)

//rolling做視窗為3行的滑動求和計算
rolling(sum,t.vol,3)

 # output
[6,9,7,4,,6,13,21]

//moving做視窗為3行的滑動求和計算
moving(sum,t.vol,3)

 # output
[,,6,9,7,4,,6,13,21]

//rolling做視窗為3行,步長為2行的視窗計算
rolling(sum,t.vol,3,2)

 # output
[6,7,,13]     ///最後的視窗沒有足夠的元素時,不會輸出

6. 常用指標的計算複雜度

常用的m系列,tm系列函式都經過了優化,其時間複雜度為O(n),即每一次計算結果只會把位置0去掉,加入新的觀察值。 而mrank與其他函式稍許不同,計算速度會比其他的慢,原因是其時間複雜度為O(mn),與其視窗大小有關,視窗越大,複雜度越高。即每一次都會將結果重置。

moving,tmoving,rolling這些高階函式的複雜度與其引數內的func有關,是沒有做過優化的。所以每一次滑動都是整個視窗對於func函式進行計算,而非m系列,tm系列函式的增量計算。

故相比於moving, tmoving, rolling, m系列和tm系列函式對於相同的計算功能會有更好的效能。

一個簡單的例子:

n=1000000
x=norm(0,1, n);

//moving
timer moving(avg, x, 10);
Time elapsed:  243.331 ms

//rolling
timer moving(avg, x, 10);
Time elapsed: 599.389ms

//mavg
timer mavg(x, 10);
Time elapsed: 3.501ms

7. 涉及到視窗計算的函式 

聚合函式 m系列 ReactiveStateEngine 是否支援 tm系列 ReactiveStateEngine 是否支援 cum系列 ReactiveStateEngine 是否支援
  moving tmoving    
avg mavg tmavg cumavg
sum msum tmusm cumsum
beta mbeta tmbeta cumbeta
corr mcorr tmcorr cumcorr
count mcount tmcount cumcount
covar mcovar tmcovar cumcovar
imax mimax        
imin mimin        
max mmax tmmax cummax
min mmin tmmin cummin
first mmfirst tmfirst    
last mlast tmlast    
med mmed tmmed cummed  
prod mprod tmprod cumprod
var mvar tmvar cumvar
varp mvarp tmvarp cumvarp
std mstd tmstd cumstd
stdp mstdp tmstdp cumstdp
skew mskew tmskew    
kurtosis mkurtosis tmkurtosis    
percentile mpercentile tmpercentile cumpercentile  
rank mrank tmrank cumrank  
wsum mwsum tmwsum cumwsum
wavg mwavg tmwavg cumwavg
firstNot         cumfirstNot
lastNot         cumlastNot
mad mmad        
  move tmmove    
  mslr        
  ema        
  kama        
  sma        
  wma        
  dema        
  tema        
  trima        
  t3        
  ma        
  wilder        
  gema        
  linearTimeTrend        
  mmse          
          cumPositiveStreak  

其他涉及視窗的函式:

deltas, ratios, interval, bar, dailyAlignedBar, coevent, createReactiveStateEngine, createDailyTimeSeriesEngine, createReactiveStateEngine, createSessionWindowEngine

8. 總結

DolphinDB中的視窗函式功能非常齊全。合理運用視窗,能夠簡便地實現各種複雜邏輯,使資料分析步驟更簡潔,效率更高。