DolphinDB 函式化程式設計案例教程

語言: CN / TW / HK

DolphinDB支援函式化程式設計:函式物件可以作為高階函式的引數。這提高了程式碼表達能力,可以簡化程式碼,複雜的任務可以通過一行或幾行程式碼完成。

本教程介紹了一些常見場景下的函式化程式設計案例,重點介紹 DolphinDB 的高階函式及其使用場景。

內容主要包括:

  1. 資料匯入
  2. Lambda表示式
  3. 高階函式使用案例
  4. 部分應用案例
  5. 金融場景相關案例
  6. 機器學習相關案例

 

1. 資料匯入

1.1 整型時間轉化為 TIME 格式並匯入

CSV 資料檔案中常用整數表示時間,如 “93100000” 表示 “9:31:00.000”。為了便於查詢分析,建議將這類資料轉換為時間型別,再儲存到 DolphinDB 資料庫中。

針對這種場景,可通過 loadTextEx 函式的 transform 引數將文字檔案中待轉化的時間列指定為相應的資料型別。

本例中會用到 CSV 檔案 candle_201801.csv,資料樣本如下:

symbol,exchange,cycle,tradingDay,date,time,open,high,low,close,volume,turnover,unixTime
000001,SZSE,1,20180102,20180102,93100000,13.35,13.39,13.35,13.38,2003635,26785576.72,1514856660000
000001,SZSE,1,20180102,20180102,93200000,13.37,13.38,13.33,13.33,867181
......

(1)建庫

用指令碼建立如下分散式資料庫(按天進行值分割槽):

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)

(2)建表

下面先通過 extractTextSchema 函式獲取資料檔案的表結構。csv 檔案中的 time 欄位被識別為整型。若要將其存為 TIME 型別,可以通過 update 語句更新表結構將其轉換為 TIME 型別,然後用更新後的表結構來建立分散式表。該分散式表的分割槽列是 date 列。

schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0, schemaTB.name, schemaTB.type)
tb=db.createPartitionedTable(tb, `tb1, `date);
這裡通過   extractTextSchema  獲取表結構。使用者也可以自定義表結構。

(3)匯入資料

可以通過自定義函式 i2t 對時間列 time 進行預處理,將其轉換為 TIME 型別,並返回處理後的資料表。

def i2t(mutable t){
    return t.replaceColumn!(`time, t.time.format("000000000").temporalParse("HHmmssSSS"))
}
請注意:在自定義函式體內對資料進行處理時,請儘量使用本地的修改(以!結尾的函式)來提升效能。

呼叫 loadTextEx 函式匯入 csv 檔案的資料到分散式表,這裡指定 transform 引數為 i2t 函式,匯入時會自動應用 i2t 函式處理資料。

tmpTB=loadTextEx(dbHandle=db, tableName=`tb1, partitionColumns=`date, filename=dataFilePath, transform=i2t);

(4)查詢資料

查看錶內前 2 行資料,可以看到結果符合預期。

select top 2 * from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- --------------     ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 09:31:00.000       13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 09:32:00.000       13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000

完整程式碼如下:

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

def i2t(mutable t){
    return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=i2t);
關於文字匯入的相關函式和案例,可以參考   DolphinDB資料匯入教程

1.2 有納秒時間戳的文字匯入

本例將以整數型別儲存的納秒級資料匯入為NANOTIMESTAMP型別。本例使用文字檔案 nx.txt,資料樣本如下:

SendingTimeInNano#securityID#origSendingTimeInNano#bidSize
1579510735948574000#27522#1575277200049000000#1
1579510735948606000#27522#1575277200049000000#2
...

每一行記錄通過字元'#'來分隔列,SendingTimeInNano 和 origSendingTimeInNano 用於儲存納秒時間戳。

(1)建庫建表

首先定義分散式資料庫和表,指令碼如下:

dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE,  0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);

nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);

db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);

上述指令碼建立了一個 組合分割槽 的資料庫,然後根據文字的欄位和型別建立了表 nx。

(2)匯入資料

匯入資料時,使用函式 nanotimestamp,將文字中的整型轉化為 NANOTIMESTAMP 型別:

def dataTransform(mutable t){
  return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}

最終通過 loadTextEx 匯入資料。

完整程式碼如下:

dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE,  0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);

nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);

db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);

def dataTransform(mutable t){
  return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}

pt=loadTextEx(dbHandle=db,tableName=`nx , partitionColumns=`SendingTimeInNano`securityID,filename="nx.txt",delimiter='#',transform=dataTransform);

2. Lambda 表示式

DolphinDB 中可以建立自定義函式,可以是命名函式或者匿名函式(通常為 lambda 表示式)。

x = 1..10
each(x -> pow(x,2), x)

上例定義了一個 lambda 表示式: x -> pow(x,2),作為高階函式 each 的引數,來計算每一個元素的平方。

接下去的例子中,也會有其它的 lambda 函式案例。

3. 高階函式使用案例

3.1 cross 使用案例

3.1.1 將兩個向量或矩陣,兩兩組合作為引數來呼叫函式

cross 函式的虛擬碼如下:

for(i:0~(size(X)-1)){
   for(j:0~(size(Y)-1)){
       result[i,j]=<function>(X[i], Y[j]);
   }
}
return result;

以計算 協方差矩陣 為例,一般需要使用兩個 for 迴圈計算。程式碼如下:

def matlab_cov(mutable matt){
	nullFill!(matt,0.0)
	rowss,colss=matt.shape()
	msize = min(rowss, colss)
	df=matrix(float,msize,msize)
	for (r in 0..(msize-1)){
		for (c in 0..(msize-1)){
			df[r,c]=covar(matt[:,r],matt[:,c])
		}
	}
	return df
}

以上程式碼雖然邏輯簡單,但是冗長,表達能力較差,且易出錯。

在DolphinDB 中可以使用高階函式 cross  pcross 計算協方差矩陣:

cross(covar, matt)

3.1.2 計算股票兩兩之間的相關性

本例中,我們使用金融大資料開放社群 Tushare 的滬深股票 日線行情 資料,來計算股票間的兩兩相關性。

首先我們定義一個數據庫和表,來儲存滬深股票日線行情資料。相關語句如下:

login("admin","123456")
dbPath="dfs://tushare"
yearRange=date(2008.01M + 12*0..22)
if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount
type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
db=database(dbPath,RANGE,yearRange)
hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)
上面的表是按照   日線行情  裡的結構說明定義的。

定義好表結構後,如需獲取對應的資料,可前往 Tushare 平臺註冊賬戶,獲取 TOKEN,然後參考 案例指令碼 進行資料匯入操作。本案例使用 DolphinDB 的 Python API 獲取資料,使用者也可參考 Tushare 的說明文件的說明文件使用其它語言或庫。本例使用 2008 年到 2017 年的日線行情進行說明。

在計算兩兩相關性時,首先使用 exec + pivot by 生成股票回報率矩陣:

retMatrix=exec pct_change/100 as ret from daily_line pivot by trade_date, ts_code

exec  pivot by 是 DolphinDB 程式語言的特點之一。exec  select 的用法相同,但 select 語句僅可生成表, exec 語句可以生成向量。pivot by 用於重整維度,與 exec 一起使用時會生成一個矩陣。

呼叫高階函式 cross 生成股票兩兩相關性矩陣:

corrMatrix=cross(corr,retMatrix)

查詢和每隻股票相關性最高的 10 只股票:

syms=(exec count(*) from daily_line group by ts_code).ts_code
syms="C"+strReplace(syms, ".", "_")
mostCorrelated=select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

上面程式碼中,corrMatrix是一個矩陣,需要轉化為表做進一步處理,同時新增一列表示股票程式碼。使用 table 函式轉化成表後,通過 rename! 函式去修改表的列名。由於表的列名不能以數字開頭,故此例中,在 syms 前拼接了字元 "C",並將 syms 中的字元'.'轉化成'_'。

之後,對錶做 unpivot 操作,把多列的資料轉化成一列。

為了說明中間過程,我們將以上程式碼拆解出一箇中間步驟:

select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms)

這一步生成結果如下:

ts_code   valueType  value
--------- ---------- -----------------
000001.SZ C600539_SH 1
000002.SZ C600539_SH 0.581235290880416
000004.SZ C600539_SH 0.277978963095669
000005.SZ C600539_SH 0.352580116619933
000006.SZ C600539_SH 0.5056164472398
......

這樣就得到了每隻股票與其它股票的相關係數。之後又使用 rename! 來修改列名,然後通過 context by 來按照 ts_code (股票程式碼)分組計算。每組中,查詢相關性最高的 10 只股票。

最終完整程式碼為:

login("admin","123456")
daily_line= loadTable("dfs://tushare","hushen_daily_line")

retMatrix=exec pct_change/100 as ret from daily_line pivot by trade_date,ts_code
corrMatrix=cross(corr,retMatrix)

syms=(exec count(*) from daily_line group by ts_code).ts_code
syms="C"+strReplace(syms, ".", "_")
mostCorrelated=select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

3.2 each 使用案例

某些場景需要把函式應用到指定引數中的每個元素。若不使用函式化程式設計,需要使用 for 迴圈。DolphinDB 提供的高階函式,例如 each, peach, loop, ploop 等,可以簡化程式碼。

3.2.1 獲取資料表各個列的 NULL 值個數

計算表 t 各列的 NULL 值個數,可以使用高階函式 each 

each(x->x.size() - x.count(), t.values())
在 DolphinDB 中,對於向量或矩陣,size 返回所有元素的個數,而 count 返回的是非 NULL 元素的個數。因此可以通過 size 和 count 的差值獲得 NULL 元素的個數。

其中,t.values() 返回一個tuple,每個元素為表 t 其中的一列。

3.2.2 去除表中存在 NULL 值的行

先通過如下程式碼生成表 t:

sym = take(`a`b`c, 110)
id = 1..100 join take(int(),10)
id2 =  take(int(),10) join 1..100
t = table(sym, id,id2)

可用以下兩種方法實現。

第一種是直接按行處理,檢查每一行是否存在 NULL 值,若存在就去除該行。解決方案如下:

t[each(x -> !(x.id == NULL || x.id2 == NULL), t)]

需要注意的是,按行處理表時,表的每一行是一個字典物件。這裡定義了一個 lambda 表示式來檢查空值。

若列數較多,不便列舉時,可以採用以下寫法:

t[each(x -> all(isValid(x.values())), t)]

上面程式碼中,x.values 獲取了該字典所有的值,然後通過 isValid 檢查 NULL 值,最後通過 all 將結果彙總,判斷該行是否包含 NULL 值。

當資料量較大時,上述指令碼執行效率較低。

DolphinDB 採用列式儲存,列操作較行操作具有更佳的效能。我們可以呼叫高階函式 each 對錶的每一列分別應用 isValid 函式,返回一個結果矩陣。通過 rowAnd 判斷矩陣的每一行是否存在 0 值。

程式碼如下:

t[each(isValid, t.values()).rowAnd()]

當資料量很大時,可能會產生如下報錯:

The number of cells in a matrix can't exceed 2 billions.

這是因為 each(isValid, t.values()) 生成的矩陣過大。為解決該問題,可以呼叫 reduce 進行迭代計算,遍歷檢查每一列是否存在 NULL 值。

t[reduce(def(x,y) -> x and isValid(y), t.values(), true)]

3.2.3 按行處理與按列處理效能比較案例

下例對錶的某個欄位進行如下處理:"aaaa_bbbb" 替換為 "bbbb_aaaa"。

先建立一個表 t:

t=table(take("aaaa_bbbb", 1000000) as str);

有兩種處理思路,可以按行處理或按列處理。

按行處理:

可以呼叫高階函式 each 遍歷每一行資料,切分後拼接。

each(x -> split(x, '_').reverse().concat('_'), t[`str])

按列處理:

pos = strpos(t[`str], "_")
substr(t[`str], pos+1)+"_"+t[`str].left(pos)

對比兩種方式的效能,可以看到使用高階函式 each 按行遍歷的時間在 2s300ms 左右,而按列處理的時間在 100ms 左右。因此按列處理效能更高。

完整程式碼和測試結果如下:

t=table(take("aaaa_bbbb", 1000000) as str);

timer r = each(x -> split(x, '_').reverse().concat('_'), t[`str])

timer {
	pos = strpos(t[`str], "_")
	r = substr(t[`str], pos+1)+"_"+t[`str].left(pos)
}

3.2.4 判斷兩張表內容是否相同

判斷兩張表 t1 和 t2 的資料是否完全相同,可以使用 each 高階函式,對錶的每列進行比較。

all(each(eqObj, t1.values(), t2.values()))

3.3 loop 使用案例

3.3.1 loop 與 each 的區別

高階函式 loop  each 相似, 區別在於結果的格式和型別。

each  func 的第一個返回值資料格式和型別決定了所有返回值資料格式和型別。而 loop 沒有這樣的限制,適用於函式返回值型別不同的情況。

def parse_signals(mutable tbl_value, value){
    kvs = split(value, ',');
    d = dict(STRING, STRING);
    for(kv in kvs) {
        sp = split(kv, ':');
        d[sp[0]] = sp[1];
    }
    insert into tbl_value values(date(d[`tradingday]), d[`signal_id], d[`index], d[`underlying], d[`symbol], int(d[`volume]), int(d[`buysell]), int(d[`openclose]), temporalParse(d[`signal_time], "HHmmssSSS"));
}
tbl_value=table(100:0, [`tradingday,`signal_id,`index,`underlying,`symbol,`volume,`buysell,`openclose,`signal_time],[DATE,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT,INT,INT,TIME]);

v1="tradingday:2020.06.03,signal_id:1,index:000300,underlying:510300,symbol:10002985,volume:2,buysell:0,openclose:0,signal_time:093000120";
v2="tradingday:2020.06.04,signal_id:2,index:000500,underlying:510050,symbol:10002986,volume:3,buysell:1,openclose:1,signal_time:093100120"
parse_signals(tbl_value, v1);
each(parse_signals{tbl_value}, [v1, v2]);

上面案例中,若使用 each 函式會報錯:

Not allowed to create void vector

each 作為高階函式,會並行執行多個計算任務。第一個任務的結果型別將決定整個函式的執行結果的型別。若單個任務返回一個 scalar,那麼 each 返回一個 vector;若單個任務返回 vector,那麼 each 返回一個 matrix;若單個任務返回字典 each,那麼 each 返回一個 table。

該問題中的 parse_signals 函式沒有任何返回值(也就是返回一個 NOTHING 標量),所以 each 試圖去建立一個型別為 void 的 vector,這在 DolphinDB 中是不被允許的。

 each 替換為 looploop 返回一個 tuple,每個單獨任務的返回值作為 tuple 的一個元素。

loop(parse_signals{tbl_value}, [v1, v2]);

3.3.2 匯入多個檔案

假設在一個目錄下,有多個結構相同的 csv 檔案,需將其匯入到同一個 DolphinDB 記憶體表中。可以呼叫高階函式 loop 來實現:

loop(loadText, fileDir + "/" + files(fileDir).filename).unionAll(false)

3.4 moving/rolling 使用案例

3.4.1 moving 案例

以當前記錄的 UpAvgPrice 和 DownAvgPrice 欄位值確定一個區間,取 close 欄位的前 20 個數計算其是否在區間 [DownAvgPrice, UpAvgPrice] 範圍內,並統計範圍內資料的百分比。

資料如下:

以 trade_date 為 2019.06.17 的記錄的 UpAvgPrice 和 DownAvgPrice 欄位確定一個區間 [11.5886533, 12.8061868],檢查該記錄的前 20 行 close 資料(即圖中標 1 這列)是否在對應區間中,若其中有 75% 的資料落在區間內,則 signal(圖中標 4)的值設為 true,否則為 false。

解決方案:

使用高階函式 moving。下例編寫自定義函式 rangeTest 對每個視窗的資料進行上述區間判斷,返回 true 或 false。

defg rangeTest(close, downlimit, uplimit){
  size = close.size() - 1
  return between(close.subarray(0:size), downlimit.last():uplimit.last()).sum() >= size*0.75
}

update t set signal = moving(rangeTest, [close, downAvgPrice, upAvgPrice], 21)
本例中,因為是計算前 20 行作為當期行的列數值,因而視窗需要包含前 20 條記錄和本條記錄,故視窗大小為 21 行。

上例呼叫   between  函式,來檢查每個元素是否在 a 和 b 之間(邊界包含在內)。

下例模擬行情資料,建立一個測試表 t:

t=table(rand("d"+string(1..n),n) as ts_code, nanotimestamp(2008.01.10+1..n) as trade_date, rand(n,n) as open, rand(n,n) as high, rand(n,n) as low, rand(n,n) as close, rand(n,n) as pre_close, rand(n,n) as change, rand(n,n) as pct_change, rand(n,n) as vol, rand(n,n) as amount, rand(n,n) as downAvgPrice, rand(n,n) as upAvgPrice, rand(1 0,n) as singna)

rolling 和 moving 類似,都將函式運算子應用到滑動視窗,進行視窗計算。兩者也有細微區別: rolling 可以指定步長 step,moving 的步長為 1;且兩者對空值的處理也不相同。詳情可參考 rolling 的空值處理

3.4.2 moving(sum) 和 msum 效能差距

雖然DolphinDB提供了高階函式moving,但是如果所要進行的計算可以用m系列函式(例如msum, mcount, mavg等)實現,請避免使用moving實現,這是因為m系列函式進行了優化,效能遠超moving。下面以 moving(sum) 和 msum為例:

x=1..1000000
timer moving(sum, x, 10)
timer msum(x, 10)

根據資料量的不同,msum 比 moving(sum) 計算耗時縮短 50 至 200 倍。

效能差距的主要原因如下:

  • 取數方式不同: msum 是一次性將資料讀入記憶體,無需為每次計算任務單獨分配記憶體; moving(sum) 每次計算都會生成一個子物件,每次計算都需要為子物件申請記憶體,計算完成後還需要進行記憶體回收。
  • msum 為增量計算,每次視窗計算都使用上一個視窗計算的結果。即直接加上當前視窗新合入的資料,並減去上一個視窗的第一條資料;而 moving(sum) 為全量計算,即每次計算都會累加視窗內的所有資料。

3.5 eachPre 使用案例

建立一個表 t,包含 sym 和 BidPrice 兩列:

t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)

需要進行如下計算:

  • 1.生成新的一列 ln 用於儲存以下因子的計算結果:先計算當前的 bidPrice 值除以前 3 行 bidPrice 均值的結果(不包括當前行),然後取自然對數。
  • 2.基於列 ln,生成新列 clean 用於儲存以下因子的計算結果:計算 ln 的絕對值,若該值大於波動範圍閾值 F,則取上一條記錄的 ln 值,反之則認為當前報價正常,並保留當前的 ln 值。

根據 ln 列的因子計算規則,可以分析出該問題涉及到滑動視窗計算,視窗的大小為 3。參考 3.4.1 的 moving 案例,具體指令碼如下:

t2 = select *, log(bidPrice / prev(moving(avg, bidPrice,3))) as ln from t

由於內建函式 msum,mcount  mavg  moving 高階函式有更好的效能,可以將上述指令碼改寫如下:

//method 1
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t

//method 2
t22 = select *, log(bidPrice / mavg(prev(bidPrice),3)) as ln from t

此處呼叫 prev 函式獲取前一行的資料。

“先計算均值再移動結果” 和 “先移動列再計算均值” 效果等價的。唯一的區別是:表 t22 第三行會產生一個結果。

對於第二個資料處理要求,我們假設波動返回 F 為 0.02, 然後實現一個自定義函式 cleanFun 來實現其取值邏輯,如下:

F = 0.02
def cleanFun(F, x, y): iif(abs(x) > F, y, x)

這裡的引數 x 表示當前值,y 表示前一個值。然後呼叫高階函式 eachPre 來對相鄰元素兩兩計算,該函式等價於實現: F(X[0], pre), F(X[1], X[0]), ..., F(X[n], X[n-1])。對應指令碼如下:

t2[`clean] = eachPre(cleanFun{F}, t2[`ln])

完整程式碼如下:

F = 0.02
t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t
def cleanFun(F,x,y) : iif(abs(x) > F, y,x)
t2[`clean] = eachPre(cleanFun{F}, t2[`ln])

3.6 byRow 使用案例

計算矩陣每行最大值的下標。下例生成一個矩陣 m:

a1=2 3 4
a2=1 2 3
a3=1 4 5
a4=5 3 2
m = matrix(a1,a2,a3,a4)

一種思路是,對每行分別計算最大值的下標,可以直接呼叫 imax 函式實現。imax 在矩陣每列單獨計算,返回一個向量。

為求每行的計算結果,可以先對矩陣進行轉置操作,然後呼叫 imax 函式進行計算。

imax(m.transpose())

此外,DolphinDB 還提供了一個高階函式 byRow,對矩陣的每一行應用指定函式進行計算。使用該函式可以避免轉置操作。

byRow(imax, m)

3.7 segmentby 使用案例

高階函式 segmentby。其語法如下:

segmentby(func, funcArgs, segment)

根據 segment 引數取值確定分組方案,連續的相同值分為一組,進行分組計算。返回的結果與 segment 引數的長度相同。

x=1 2 3 0 3 2 1 4 5
y=1 1 1 -1 -1 -1 1 1 1
segmentby(cumsum,x,y);

上例中,根據 y 確定了 3 個分組:1 1 1, -1 -1 -1 和 1 1 1,由此把 x 也分為 3 組:1 2 3, 0 3 2 和 1 4 5,並將 cumsum 函式應用到 x 的每個分組,計算每個分組的累計和。

DolphinDB 還提供了內建函式 segment 用於在SQL語句中進行分組。與 segmentby 不同,它只返回分組資訊,而不對分組進行計算。

下例中,將表的某列資料按照給定閾值進行分組,連續小於或大於該閾值的資料被劃分為一組。連續大於該閾值的分組將保留組內最大值對應的記錄並輸出(若有重複值則輸出第一條)。

表內容如下圖所示,當閾值為 0.3 時,希望結果保留箭頭所指記錄:

表定義如下:

dated = 2021.09.01..2021.09.12
v = 0 0 0.3 0.3 0 0.5 0.3 0.7 0 0 0.3 0
t = table(dated as date, v)

將資料按照是否連續大於 minV 來分組時,可以使用函式 segment

segment(v>= minV)

在 SQL 中配合 context by 語句進行分組計算,通過 having 子句過濾分組的最大值。過濾結果可能存在多行,根據需求只保留第一行滿足結果的資料,此時可以通過指定 limit 子句限定輸出的記錄數。

完整的 SQL 查詢語句如下:

select * from t context by segment(v>= minV) having (v=max(v) and v>=minV) limit 1

3.8 pivot 使用案例

高階函式 pivot 可以在指定的二維維度上重組資料,結果為一個矩陣。

現有包含 4 列資料的表 t1:

syms=`600300`600400`600500$SYMBOL
sym=syms[0 0 0 0 0 0 0 1 1 1 1 1 1 1 2 2 2 2 2 2 2]
time=09:40:00+1 30 65 90 130 185 195 10 40 90 140 160 190 200 5 45 80 140 170 190 210
price=172.12 170.32 172.25 172.55 175.1 174.85 174.5 36.45 36.15 36.3 35.9 36.5 37.15 36.9 40.1 40.2 40.25 40.15 40.1 40.05 39.95
volume=100 * 10 3 7 8 25 6 10 4 5 1 2 8 6 10 2 2 5 5 4 4 3
t1=table(sym, time, price, volume);
t1;

將 t1 的資料依據 time 和 sym 維度進行資料重組,並且計算每分鐘股價的加權平均值,以交易量為權重。

stockprice=pivot(wavg, [t1.price, t1.volume], minute(t1.time), t1.sym)
stockprice.round(2)

3.9 contextby 使用案例

高階函式 contextby 可以將資料根據列欄位分組,並在組內呼叫指定函式進行計算。

sym=`IBM`IBM`IBM`MS`MS`MS
price=172.12 170.32 175.25 26.46 31.45 29.43
qty=5800 700 9000 6300 2100 5300
trade_date=2013.05.08 2013.05.06 2013.05.07 2013.05.08 2013.05.06 2013.05.07;
contextby(avg, price, sym);

contextby 亦可搭配 SQL 語句使用。下例呼叫 contextby 篩選出價格高於組內平均價的交易記錄:

t1=table(trade_date,sym,qty,price);
select trade_date, sym, qty, price from t1 where price > contextby(avg, price,sym);

3.10 call/unifiedCall 使用案例

對需要批量呼叫不同函式進行計算的場景,可以通過高階函式 call 或者 unifiedCall 配合高階函式 each/loop 實現。

call    unifiedCall  功能相同,但引數形式不同,詳情可參考使用者手冊。

下例中在部分應用中呼叫了函式 call 函式,該部分應用將向量 [1, 2, 3] 作為固定引數,在高階函式 each 中呼叫函式 sin  log

each(call{, 1..3},(sin,log));

此外,還可通過超程式設計方式呼叫函式。這裡會用到funcByName。上述例子可改寫為:

each(call{, 1..3},(funcByName('sin'),funcByName('log')));

或者,使用 makeCall/makeUnifiedCall 生成元程式碼,後續通過 eval 來執行:

each(eval, each(makeCall{,1..3},(sin,log)))

3.11 accumulate 使用案例

已知分鐘線資料如下,將某隻股票每成交約 150 萬股進行一次時間切分,最後得到時間視窗長度不等的若干條資料。具體的切分規則為:若某點的資料合入分組,可以縮小資料量和閾值(150 萬)間的差值,則加入該點,否則當前分組不合入該點的資料。示意圖如下:

構造測試資料如下:

timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)

這裡自定義一個分組計算函式,將 volume 的累加,按上述切分規則,以 150 萬為閾值進行分組。

先定義一個分組函式,如下:

def caclCumVol(target, preResult, x){
 result = preResult + x
 if(result - target> target - preResult) return x
 else return result
}
accumulate(caclCumVol{1500000}, volume)

上述指令碼通過自定義函式 caclCumVol 計算 volume 的累加值,結果最接近 150 萬時劃分分組。新的分組將從下一個 volume 值開始重新累加。對應指令碼如下:

iif(accumulate(caclCumVol{1500000}, volume) ==volume, timex, NULL).ffill()

通過和 volume 比較,篩選出了每組的起始記錄。若中間結果存在空值,則呼叫 ffill 函式進行前值填充。將獲得的結果配合 group by 語句進行分組計算,查詢時,注意替換以上指令碼的 timex 為表的 time 欄位。

output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume) ==volume, time, NULL).ffill() as startTime

完整程式碼如下:

timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)

def caclCumVol(target, preResult, x){
 result = preResult + x
 if(result - target> target - preResult) return x
 else return result
}
output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume)==volume, time, NULL).ffill() as startTime

3.12 window 使用案例

對錶中的某列資料進行以下計算,如果當前數值是前 5 個數據的最低值 (包括當前值),也是後 5 個最低值 (包括當前值),那麼標記是 1,否則是 0。

建立測試表 t:

t = table(rand(1..100,20) as id)

可以通過應用視窗函式 window,指定一個前後都為 5 的資料視窗,在該視窗內通過呼叫 min 函式計算最小值。注意:函式 window 的視窗邊界包含在視窗中。

實現指令碼如下:

select *, iif(id==window(min, id, -4:4), 1, 0) as mid from t

3.13 reduce 使用案例

上面的一些案例中,也有用到高階函式 reduce。虛擬碼如下:

result=<function>(init,X[0]);
for(i:1~size(X)){
  result=<function>(result, X[i]);
}
return result;

 accumulate 返回中間結果不同,reduce 只返回最後一個結果。

例如下面的計算階乘的例子:

r1 = reduce(mul, 1..10);
r2 = accumulate(mul, 1..10)[9];

最終 r1 和 r2 的結果是一樣的。

4. 部分應用案例

部分應用是指固定一個函式的部分引數,產生一個引數較少的函式。部分應用通常應用在對引數個數有特定要求的高階函式中。

4.1 提交帶有引數的作業

假設需要一個 定時任務,每日 0 點執行,用於計算某裝置前一日溫度指標的最大值。

假設裝置的溫度資訊儲存在分散式庫 dfs://dolphindb 下的表 sensor 中,其時間欄位為 ts,型別為 DATETIME。下例定義一個 getMaxTemperature 函式來實現計算過程,指令碼如下:

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,date(ts) = today()-1
    return  maxTemp
}

定義計算函式後,可通過函式 scheduleJob 提交定時任務。由於函式 scheduleJob 不提供介面供任務函式進行傳參,而自定義函式 getMaxTemperature 以裝置 deviceID 作為引數,這裡可以通過部分應用來固定引數,從而產生一個沒有引數的函式。指令碼如下:

scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

上例只查詢了裝置號為 1 的裝置。

最終,完整程式碼如下:

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,date(ts) = today()-1
    return  maxTemp
}

scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

4.2 獲取叢集其它節點作業資訊

在 DolphinDB 中提交定時作業後,可通過函式 getRecentJobs 來取得本地節點上最近幾個批處理作業的狀態。如檢視本地節點最近 3 個批處理作業狀態,可以用如下所示指令碼實現:

getRecentJobs(3);

若想獲取叢集上其它節點的作業資訊,需通過函式 rpc 來在指定的遠端節點上呼叫內建函式 getRecentJobs。如獲取節點別名為 P1-node1 的作業資訊,可以如下實現:

rpc("P1-node1",getRecentJobs)

如需獲取節點 P1-node1 上最近 3 個作業的資訊,通過如下指令碼實現會報錯:

rpc("P1-node1",getRecentJobs(3))

因為 rpc 函式第二個引數需要為函式(內建函式或使用者自定義函式)。這裡可以通過 DolphinDB 的部分應用,固定函式引數,來生成一個新的函式給 rpc 使用,如下:

rpc("P1-node1",getRecentJobs{3})

4.3 帶 “狀態” 的流計算訊息處理函式

在流計算中,使用者通常需要給定一個訊息處理函式,接受到訊息後進行處理。這個處理函式是一元函式或資料表。若為函式,用於處理訂閱資料,其唯一的引數是訂閱的資料,即不能包含狀態資訊。

下例通過部分應用定義訊息處理函式 cumulativeAverage,用於計算資料的累計均值。

定義流表 trades,對於其 price 欄位,每接受一條訊息,計算一次 price 的均值,並輸出到結果表 avgTable 中。指令碼如下:

share streamTable(10000:0,`time`symbol`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
avgT=table(10000:0,[`avg_price],[DOUBLE])

def cumulativeAverage(mutable avgTable, mutable stat, trade){
   newVals = exec price from trade;

   for(val in newVals) {
      stat[0] = (stat[0] * stat[1] + val )/(stat[1] + 1)
      stat[1] += 1
      insert into avgTable values(stat[0])
   }
}

subscribeTable(tableName="trades", actionName="action30", handler=cumulativeAverage{avgT,0.0 0.0}, msgAsTable=true)

自定義函式 cumulativeAverage 的引數 avgTable 為計算結果的儲存表。stat 是一個向量,包含了兩個值:其中,stat[0] 用來表示當前的所有資料的平均值,stat[1] 表示資料個數。函式體的計算實現為:遍歷資料更新 stat 的值,並將新的計算結果插入表。

訂閱流表時,通過在 handler 中固定前兩個引數,實現帶 “狀態” 的訊息處理函式。

5. 金融場景相關案例

5.1 使用 map reduce,對 tick 資料降精度

下例中,使用 mr 函式(map reduce)將 tick 資料轉化為分鐘級資料。

在DolphinDB中,可以使用SQL語句基於 tick 資料計算分鐘級資料:

minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute

但在資料量較大時,該實現效率低,耗時長。為提升效能,可以使用 DolphinDB 的分散式計算。

Map-Reduce 函式 mr 是 DolphinDB 通用分散式計算框架的核心功能。

完整程式碼如下:

login(`admin, `123456)
db = database("dfs://TAQ")
quotes = db.loadTable("quotes")

//create a new table quotes_minute
model=select  top 1 symbol,date, minute(time) as minute,bid,ofr from quotes where date=2007.08.01,symbol=`EBAY
if(existsTable("dfs://TAQ", "quotes_minute"))
db.dropTable("quotes_minute")
db.createPartitionedTable(model, "quotes_minute", `date`symbol)

//populate data for table quotes_minute
def saveMinuteQuote(t){
minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute
loadTable("dfs://TAQ", "quotes_minute").append!(minuteQuotes)
return minuteQuotes.size()
}

ds = sqlDS(<select symbol,date,time,bid,ofr from quotes where date between 2007.08.01 : 2007.08.31>)
timer mr(ds, saveMinuteQuote, +)

5.2 資料回放和高頻因子計算

有狀態的因子,即因子的計算不僅用到當前資料,還會用到歷史資料。實現狀態因子的計算,一般包括這幾個步驟:

  1. 儲存本批次的訊息資料到歷史記錄;
  2. 根據更新後的歷史記錄,計算因子
  3. 將因子計算結果寫入輸出表中。如有必要,刪除未來不再需要的的歷史記錄。

DolphinDB 的訊息處理函式必須是單目函式,其唯一的引數就是當前的訊息。要儲存歷史狀態並在訊息處理函式中計算曆史資料,可以通過部分應用實現:對於多引數的訊息處理函式,保留一個引數用於接收訊息,固化其它所有的引數,用於儲存歷史狀態。這些固化引數只對訊息處理函式可見,不受其他應用的影響。

歷史狀態可儲存在記憶體表,字典或分割槽記憶體表中。本例將使用 DolphinDB流計算引擎 來處理 報價資料 通過字典儲存歷史狀態並計算因子。如需通過記憶體表或分散式記憶體表儲存歷史狀態,可以參考 實時計算高頻因子

定義狀態因子:計算當前第一檔賣價 (askPrice1) 與 30 個報價前的第一檔賣價的比值。

對應的因子計算函式 factorAskPriceRatio 實現如下:

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}

匯入資料建立對應的流表後,可以通過 replay 函式回放資料,模擬實時流計算的場景。

quotesData = loadText("/data/ddb/data/sampleQuotes.csv")

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1

由於這裡使用字典儲存歷史狀態,可以定義如下字典:

history = dict(STRING, ANY)

該字典的鍵值為 STRING 型別,值為元組(tuple)型別,儲存股票欄位,值為元組(tuple)型別,儲存賣價的歷史資料。

下例呼叫 dictUpdate! 函式更新字典,然後迴圈計算每隻股票的因子,並通過表儲存因子的計算結果。然後訂閱流表,通過資料回放向流表注入資料,每到來一條新資料都將觸發因子的計算。

訊息處理函式定義如下:

def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

引數 historyDict 為儲存歷史狀態的字典,factors 是儲存計算結果的表。

完整程式碼如下:

quotesData = loadText("/data/ddb/data/sampleQuotes.csv")

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}
def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
history = dict(STRING, ANY)
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName = "quotes1", offset=0, handler=factorHandler{history, factors}, msgAsTable=true, batchSize=3000, throttle=0.005)

replay(inputTables=quotesData, outputTables=quotes1, dateColumn=`date, timeColumn=`time)

檢視結果

select top 10 * from factors where isValid(factor)

5.3 基於字典的計算

下例建立表 orders,該表包含了一些簡單的股票資訊:

orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)

建立一個字典。鍵為股票程式碼,值為從 orders 表中篩選出來的只包含該股票資訊的子表。

字典定義如下:

historyDict = dict(STRING, ANY)

然後通過函式 dictUpdate!,來更新每個鍵的值,實現如下:

historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders, initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})

可以把 dictUpdate! 的執行過程理解成,針對引數 parameters 遍歷,每個 parameters 作為引數,通過 function 去更新字典 (字典的 key 由 keys 指定的)。當字典中不存在對應的 key 時,會呼叫 initFunc 去初始化 key 對應的值。

這個例子中,字典的 key 是股票程式碼,value 是 orders 的子表。

這裡,我們使用 orders.SecID 作為 keys,在更新的函式引數中,我們定義了一個 lamda 函式將當前記錄插入到表中,如下:

def(x,y){tableInsert(x,y);return x}

注意此處使用 lamda 函式封裝了 tableInsert,而非指定 function=tableInsert。這是因為 tableInsert 的返回值不是一個 table,而是插入的條數,如果直接呼叫 tableInsert,在寫入第二條 IBM 對應的記錄時,會將字典中的值更新成插入的條數;寫入第三條 IBM 對應的記錄時,系統會丟擲異常。

初始條件下,historyDict 未賦值,可以通過指定 initFunc 引數對字典進行初始化賦值:

def(x){
  t = table(100:0, x.keys(), each(type, x.values()));
  tableInsert(t, x);
  return t
}

最終,完整程式碼如下:

orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)
historyDict = dict(STRING, ANY)
historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders,
            initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})

執行後 historyDict 結果如下:

GOOG->
Vol Value SecID
--- ----- -----
7   4     GOOG

IBM->
Vol Value SecID
--- ----- -----
4   1     IBM
5   2     IBM
6   3     IBM

6. 機器學習相關案例

6.1 ols 殘差

建立樣本表 t 如下:

t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)

計算每行資料和一個向量 benchX 的迴歸殘差,並將結果儲存到新列中。

向量 benchX 如下:

benchX = 10 15 7 8 9 1 2.0

DolphinDB 提供了最小二乘迴歸函式 ols

先將表中參與計算的欄位轉化成矩陣:

mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()

然後定義殘差計算函式如下:

def(y, x) {
    return ols(y, x, true, 2).ANOVA.SS[1]
}

最後使用高階函式 each 與部分應用,對每行資料應用殘差計算函式:

t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)

完整程式碼如下:

t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)

mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()
t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)

7. 總結

除了上面提到的一些函式與高階函式。DolphinDB 還提供了豐富的 函式庫,包括數學函式、統計函式、分佈相關函式、假設檢驗函式、機器學習函式、邏輯函式、字串函式、時間函式、資料操作函式、視窗函式、高階函式、超程式設計、分散式計算函式、流計算函式、定時任務函式、效能監控函式、使用者許可權管理函式等。