DolphinDB 函數化編程案例教程
DolphinDB支持函數化編程:函數對象可以作為高階函數的參數。這提高了代碼表達能力,可以簡化代碼,複雜的任務可以通過一行或幾行代碼完成。
本教程介紹了一些常見場景下的函數化編程案例,重點介紹 DolphinDB 的高階函數及其使用場景。
內容主要包括:
- 數據導入
- Lambda表達式
- 高階函數使用案例
- 部分應用案例
- 金融場景相關案例
- 機器學習相關案例
1. 數據導入
1.1 整型時間轉化為 TIME 格式並導入
CSV 數據文件中常用整數表示時間,如 “93100000” 表示 “9:31:00.000”。為了便於查詢分析,建議將這類數據轉換為時間類型,再存儲到 DolphinDB 數據庫中。
針對這種場景,可通過 loadTextEx
函數的 transform
參數將文本文件中待轉化的時間列指定為相應的數據類型。
本例中會用到 CSV 文件 candle_201801.csv,數據樣本如下:
symbol,exchange,cycle,tradingDay,date,time,open,high,low,close,volume,turnover,unixTime
000001,SZSE,1,20180102,20180102,93100000,13.35,13.39,13.35,13.38,2003635,26785576.72,1514856660000
000001,SZSE,1,20180102,20180102,93200000,13.37,13.38,13.33,13.33,867181
......
(1)建庫
用腳本創建如下分佈式數據庫(按天進行值分區):
login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
(2)建表
下面先通過 extractTextSchema
函數獲取數據文件的表結構。csv 文件中的 time 字段被識別為整型。若要將其存為 TIME 類型,可以通過 update 語句更新表結構將其轉換為 TIME 類型,然後用更新後的表結構來創建分佈式表。該分佈式表的分區列是 date 列。
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0, schemaTB.name, schemaTB.type)
tb=db.createPartitionedTable(tb, `tb1, `date);
這裏通過
extractTextSchema
獲取表結構。用户也可以自定義表結構。
(3)導入數據
可以通過自定義函數 i2t 對時間列 time 進行預處理,將其轉換為 TIME 類型,並返回處理後的數據表。
def i2t(mutable t){
return t.replaceColumn!(`time, t.time.format("000000000").temporalParse("HHmmssSSS"))
}
請注意:在自定義函數體內對數據進行處理時,請儘量使用本地的修改(以!結尾的函數)來提升性能。
調用 loadTextEx
函數導入 csv 文件的數據到分佈式表,這裏指定 transform
參數為 i2t
函數,導入時會自動應用 i2t
函數處理數據。
tmpTB=loadTextEx(dbHandle=db, tableName=`tb1, partitionColumns=`date, filename=dataFilePath, transform=i2t);
(4)查詢數據
查看錶內前 2 行數據,可以看到結果符合預期。
select top 2 * from loadTable(dbPath,`tb1);
symbol exchange cycle tradingDay date time open high low close volume turnover unixTime
------ -------- ----- ---------- ---------- -------------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE 1 2018.01.02 2018.01.02 09:31:00.000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE 1 2018.01.02 2018.01.02 09:32:00.000 13.37 13.38 13.33 13.33 867181 1.158757E7 1514856720000
完整代碼如下:
login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);
def i2t(mutable t){
return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}
tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=i2t);
關於文本導入的相關函數和案例,可以參考 DolphinDB數據導入教程
1.2 有納秒時間戳的文本導入
本例將以整數類型存儲的納秒級數據導入為NANOTIMESTAMP類型。本例使用文本文件 nx.txt,數據樣本如下:
SendingTimeInNano#securityID#origSendingTimeInNano#bidSize
1579510735948574000#27522#1575277200049000000#1
1579510735948606000#27522#1575277200049000000#2
...
每一行記錄通過字符'#'來分隔列,SendingTimeInNano 和 origSendingTimeInNano 用於存儲納秒時間戳。
(1)建庫建表
首先定義分佈式數據庫和表,腳本如下:
dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE, 0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);
nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);
db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);
上述腳本創建了一個 組合分區 的數據庫,然後根據文本的字段和類型創建了表 nx。
(2)導入數據
導入數據時,使用函數 nanotimestamp
,將文本中的整型轉化為 NANOTIMESTAMP 類型:
def dataTransform(mutable t){
return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}
最終通過 loadTextEx
導入數據。
完整代碼如下:
dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE, 0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);
nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);
db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);
def dataTransform(mutable t){
return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}
pt=loadTextEx(dbHandle=db,tableName=`nx , partitionColumns=`SendingTimeInNano`securityID,filename="nx.txt",delimiter='#',transform=dataTransform);
2. Lambda 表達式
DolphinDB 中可以創建自定義函數,可以是命名函數或者匿名函數(通常為 lambda 表達式)。
x = 1..10
each(x -> pow(x,2), x)
上例定義了一個 lambda 表達式: x -> pow(x,2)
,作為高階函數 each
的參數,來計算每一個元素的平方。
接下去的例子中,也會有其它的 lambda 函數案例。
3. 高階函數使用案例
3.1 cross 使用案例
3.1.1 將兩個向量或矩陣,兩兩組合作為參數來調用函數
cross
函數的偽代碼如下:
for(i:0~(size(X)-1)){
for(j:0~(size(Y)-1)){
result[i,j]=<function>(X[i], Y[j]);
}
}
return result;
以計算 協方差矩陣 為例,一般需要使用兩個 for 循環計算。代碼如下:
def matlab_cov(mutable matt){
nullFill!(matt,0.0)
rowss,colss=matt.shape()
msize = min(rowss, colss)
df=matrix(float,msize,msize)
for (r in 0..(msize-1)){
for (c in 0..(msize-1)){
df[r,c]=covar(matt[:,r],matt[:,c])
}
}
return df
}
以上代碼雖然邏輯簡單,但是宂長,表達能力較差,且易出錯。
在DolphinDB 中可以使用高階函數 cross
或 pcross
計算協方差矩陣:
cross(covar, matt)
3.1.2 計算股票兩兩之間的相關性
本例中,我們使用金融大數據開放社區 Tushare 的滬深股票 日線行情 數據,來計算股票間的兩兩相關性。
首先我們定義一個數據庫和表,來存儲滬深股票日線行情數據。相關語句如下:
login("admin","123456")
dbPath="dfs://tushare"
yearRange=date(2008.01M + 12*0..22)
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount
type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
db=database(dbPath,RANGE,yearRange)
hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)
上面的表是按照 日線行情 裏的結構説明定義的。
定義好表結構後,如需獲取對應的數據,可前往 Tushare 平台註冊賬户,獲取 TOKEN,然後參考 案例腳本 進行數據導入操作。本案例使用 DolphinDB 的 Python API 獲取數據,用户也可參考 Tushare 的説明文檔的説明文檔使用其它語言或庫。本例使用 2008 年到 2017 年的日線行情進行説明。
在計算兩兩相關性時,首先使用 exec + pivot by 生成股票回報率矩陣:
retMatrix=exec pct_change/100 as ret from daily_line pivot by trade_date, ts_code
exec
和 pivot by
是 DolphinDB 編程語言的特點之一。exec
與 select
的用法相同,但 select
語句僅可生成表, exec
語句可以生成向量。pivot by
用於重整維度,與 exec
一起使用時會生成一個矩陣。
調用高階函數 cross
生成股票兩兩相關性矩陣:
corrMatrix=cross(corr,retMatrix)
查詢和每隻股票相關性最高的 10 只股票:
syms=(exec count(*) from daily_line group by ts_code).ts_code
syms="C"+strReplace(syms, ".", "_")
mostCorrelated=select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10
上面代碼中,corrMatrix是一個矩陣,需要轉化為表做進一步處理,同時新增一列表示股票代碼。使用 table 函數轉化成表後,通過 rename!
函數去修改表的列名。由於表的列名不能以數字開頭,故此例中,在 syms 前拼接了字符 "C",並將 syms 中的字符'.'轉化成'_'。
之後,對錶做 unpivot
操作,把多列的數據轉化成一列。
為了説明中間過程,我們將以上代碼拆解出一箇中間步驟:
select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms)
這一步生成結果如下:
ts_code valueType value
--------- ---------- -----------------
000001.SZ C600539_SH 1
000002.SZ C600539_SH 0.581235290880416
000004.SZ C600539_SH 0.277978963095669
000005.SZ C600539_SH 0.352580116619933
000006.SZ C600539_SH 0.5056164472398
......
這樣就得到了每隻股票與其它股票的相關係數。之後又使用 rename!
來修改列名,然後通過 context by
來按照 ts_code
(股票代碼)分組計算。每組中,查詢相關性最高的 10 只股票。
最終完整代碼為:
login("admin","123456")
daily_line= loadTable("dfs://tushare","hushen_daily_line")
retMatrix=exec pct_change/100 as ret from daily_line pivot by trade_date,ts_code
corrMatrix=cross(corr,retMatrix)
syms=(exec count(*) from daily_line group by ts_code).ts_code
syms="C"+strReplace(syms, ".", "_")
mostCorrelated=select * from table(corrMatrix.columnNames() as ts_code, corrMatrix).rename!([`ts_code].append!(syms)).unpivot(`ts_code, syms).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10
3.2 each 使用案例
某些場景需要把函數應用到指定參數中的每個元素。若不使用函數化編程,需要使用 for 循環。DolphinDB 提供的高階函數,例如 each
, peach
, loop
, ploop
等,可以簡化代碼。
3.2.1 獲取數據表各個列的 NULL 值個數
計算表 t 各列的 NULL 值個數,可以使用高階函數 each
。
each(x->x.size() - x.count(), t.values())
在 DolphinDB 中,對於向量或矩陣,size 返回所有元素的個數,而 count 返回的是非 NULL 元素的個數。因此可以通過 size 和 count 的差值獲得 NULL 元素的個數。
其中,t.values() 返回一個tuple,每個元素為表 t
其中的一列。
3.2.2 去除表中存在 NULL 值的行
先通過如下代碼生成表 t:
sym = take(`a`b`c, 110)
id = 1..100 join take(int(),10)
id2 = take(int(),10) join 1..100
t = table(sym, id,id2)
可用以下兩種方法實現。
第一種是直接按行處理,檢查每一行是否存在 NULL 值,若存在就去除該行。解決方案如下:
t[each(x -> !(x.id == NULL || x.id2 == NULL), t)]
需要注意的是,按行處理表時,表的每一行是一個字典對象。這裏定義了一個 lambda 表達式來檢查空值。
若列數較多,不便枚舉時,可以採用以下寫法:
t[each(x -> all(isValid(x.values())), t)]
上面代碼中,x.values
獲取了該字典所有的值,然後通過 isValid
檢查 NULL 值,最後通過 all
將結果彙總,判斷該行是否包含 NULL 值。
當數據量較大時,上述腳本運行效率較低。
DolphinDB 採用列式存儲,列操作較行操作具有更佳的性能。我們可以調用高階函數 each
對錶的每一列分別應用 isValid
函數,返回一個結果矩陣。通過 rowAnd
判斷矩陣的每一行是否存在 0 值。
代碼如下:
t[each(isValid, t.values()).rowAnd()]
當數據量很大時,可能會產生如下報錯:
The number of cells in a matrix can't exceed 2 billions.
這是因為 each(isValid, t.values())
生成的矩陣過大。為解決該問題,可以調用 reduce
進行迭代計算,遍歷檢查每一列是否存在 NULL 值。
t[reduce(def(x,y) -> x and isValid(y), t.values(), true)]
3.2.3 按行處理與按列處理性能比較案例
下例對錶的某個字段進行如下處理:"aaaa_bbbb" 替換為 "bbbb_aaaa"。
先創建一個表 t:
t=table(take("aaaa_bbbb", 1000000) as str);
有兩種處理思路,可以按行處理或按列處理。
按行處理:
可以調用高階函數 each
遍歷每一行數據,切分後拼接。
each(x -> split(x, '_').reverse().concat('_'), t[`str])
按列處理:
pos = strpos(t[`str], "_")
substr(t[`str], pos+1)+"_"+t[`str].left(pos)
對比兩種方式的性能,可以看到使用高階函數 each
按行遍歷的時間在 2s300ms 左右,而按列處理的時間在 100ms 左右。因此按列處理性能更高。
完整代碼和測試結果如下:
t=table(take("aaaa_bbbb", 1000000) as str);
timer r = each(x -> split(x, '_').reverse().concat('_'), t[`str])
timer {
pos = strpos(t[`str], "_")
r = substr(t[`str], pos+1)+"_"+t[`str].left(pos)
}
3.2.4 判斷兩張表內容是否相同
判斷兩張表 t1 和 t2 的數據是否完全相同,可以使用 each
高階函數,對錶的每列進行比較。
all(each(eqObj, t1.values(), t2.values()))
3.3 loop 使用案例
3.3.1 loop 與 each 的區別
高階函數 loop
與 each
相似, 區別在於結果的格式和類型。
each
中 func 的第一個返回值數據格式和類型決定了所有返回值數據格式和類型。而 loop
沒有這樣的限制,適用於函數返回值類型不同的情況。
def parse_signals(mutable tbl_value, value){
kvs = split(value, ',');
d = dict(STRING, STRING);
for(kv in kvs) {
sp = split(kv, ':');
d[sp[0]] = sp[1];
}
insert into tbl_value values(date(d[`tradingday]), d[`signal_id], d[`index], d[`underlying], d[`symbol], int(d[`volume]), int(d[`buysell]), int(d[`openclose]), temporalParse(d[`signal_time], "HHmmssSSS"));
}
tbl_value=table(100:0, [`tradingday,`signal_id,`index,`underlying,`symbol,`volume,`buysell,`openclose,`signal_time],[DATE,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT,INT,INT,TIME]);
v1="tradingday:2020.06.03,signal_id:1,index:000300,underlying:510300,symbol:10002985,volume:2,buysell:0,openclose:0,signal_time:093000120";
v2="tradingday:2020.06.04,signal_id:2,index:000500,underlying:510050,symbol:10002986,volume:3,buysell:1,openclose:1,signal_time:093100120"
parse_signals(tbl_value, v1);
each(parse_signals{tbl_value}, [v1, v2]);
上面案例中,若使用 each
函數會報錯:
Not allowed to create void vector
each
作為高階函數,會並行執行多個計算任務。第一個任務的結果類型將決定整個函數的運行結果的類型。若單個任務返回一個 scalar,那麼 each
返回一個 vector;若單個任務返回 vector,那麼 each
返回一個 matrix;若單個任務返回字典 each
,那麼 each
返回一個 table。
該問題中的 parse_signals 函數沒有任何返回值(也就是返回一個 NOTHING 標量),所以 each
試圖去創建一個類型為 void 的 vector,這在 DolphinDB 中是不被允許的。
將 each
替換為 loop
。loop
返回一個 tuple,每個單獨任務的返回值作為 tuple 的一個元素。
loop(parse_signals{tbl_value}, [v1, v2]);
3.3.2 導入多個文件
假設在一個目錄下,有多個結構相同的 csv 文件,需將其導入到同一個 DolphinDB 內存表中。可以調用高階函數 loop
來實現:
loop(loadText, fileDir + "/" + files(fileDir).filename).unionAll(false)
3.4 moving/rolling 使用案例
3.4.1 moving 案例
以當前記錄的 UpAvgPrice 和 DownAvgPrice 字段值確定一個區間,取 close 字段的前 20 個數計算其是否在區間 [DownAvgPrice, UpAvgPrice] 範圍內,並統計範圍內數據的百分比。
數據如下:
以 trade_date 為 2019.06.17 的記錄的 UpAvgPrice 和 DownAvgPrice 字段確定一個區間 [11.5886533, 12.8061868],檢查該記錄的前 20 行 close 數據(即圖中標 1 這列)是否在對應區間中,若其中有 75% 的數據落在區間內,則 signal(圖中標 4)的值設為 true,否則為 false。
解決方案:
使用高階函數 moving
。下例編寫自定義函數 rangeTest 對每個窗口的數據進行上述區間判斷,返回 true 或 false。
defg rangeTest(close, downlimit, uplimit){
size = close.size() - 1
return between(close.subarray(0:size), downlimit.last():uplimit.last()).sum() >= size*0.75
}
update t set signal = moving(rangeTest, [close, downAvgPrice, upAvgPrice], 21)
本例中,因為是計算前 20 行作為當期行的列數值,因而窗口需要包含前 20 條記錄和本條記錄,故窗口大小為 21 行。
上例調用between
函數,來檢查每個元素是否在 a 和 b 之間(邊界包含在內)。
下例模擬行情數據,創建一個測試表 t:
t=table(rand("d"+string(1..n),n) as ts_code, nanotimestamp(2008.01.10+1..n) as trade_date, rand(n,n) as open, rand(n,n) as high, rand(n,n) as low, rand(n,n) as close, rand(n,n) as pre_close, rand(n,n) as change, rand(n,n) as pct_change, rand(n,n) as vol, rand(n,n) as amount, rand(n,n) as downAvgPrice, rand(n,n) as upAvgPrice, rand(1 0,n) as singna)
rolling 和 moving 類似,都將函數運算符應用到滑動窗口,進行窗口計算。兩者也有細微區別: rolling
可以指定步長 step,moving 的步長為 1;且兩者對空值的處理也不相同。詳情可參考 rolling 的空值處理。
3.4.2 moving(sum) 和 msum 性能差距
雖然DolphinDB提供了高階函數moving,但是如果所要進行的計算可以用m系列函數(例如msum
, mcount
, mavg
等)實現,請避免使用moving實現,這是因為m系列函數進行了優化,性能遠超moving。下面以 moving(sum) 和 msum為例:
x=1..1000000
timer moving(sum, x, 10)
timer msum(x, 10)
根據數據量的不同,msum 比 moving(sum) 計算耗時縮短 50 至 200 倍。
性能差距的主要原因如下:
- 取數方式不同: msum 是一次性將數據讀入內存,無需為每次計算任務單獨分配內存; moving(sum) 每次計算都會生成一個子對象,每次計算都需要為子對象申請內存,計算完成後還需要進行內存回收。
- msum 為增量計算,每次窗口計算都使用上一個窗口計算的結果。即直接加上當前窗口新合入的數據,並減去上一個窗口的第一條數據;而 moving(sum) 為全量計算,即每次計算都會累加窗口內的所有數據。
3.5 eachPre 使用案例
創建一個表 t,包含 sym 和 BidPrice 兩列:
t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)
需要進行如下計算:
- 1.生成新的一列 ln 用於存儲以下因子的計算結果:先計算當前的 bidPrice 值除以前 3 行 bidPrice 均值的結果(不包括當前行),然後取自然對數。
- 2.基於列 ln,生成新列 clean 用於存儲以下因子的計算結果:計算 ln 的絕對值,若該值大於波動範圍閾值 F,則取上一條記錄的 ln 值,反之則認為當前報價正常,並保留當前的 ln 值。
根據 ln 列的因子計算規則,可以分析出該問題涉及到滑動窗口計算,窗口的大小為 3。參考 3.4.1 的 moving 案例,具體腳本如下:
t2 = select *, log(bidPrice / prev(moving(avg, bidPrice,3))) as ln from t
由於內置函數 msum
,mcount
和 mavg
比 moving
高階函數有更好的性能,可以將上述腳本改寫如下:
//method 1
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t
//method 2
t22 = select *, log(bidPrice / mavg(prev(bidPrice),3)) as ln from t
此處調用 prev
函數獲取前一行的數據。
“先計算均值再移動結果” 和 “先移動列再計算均值” 效果等價的。唯一的區別是:表 t22 第三行會產生一個結果。
對於第二個數據處理要求,我們假設波動返回 F 為 0.02, 然後實現一個自定義函數 cleanFun
來實現其取值邏輯,如下:
F = 0.02
def cleanFun(F, x, y): iif(abs(x) > F, y, x)
這裏的參數 x 表示當前值,y 表示前一個值。然後調用高階函數 eachPre
來對相鄰元素兩兩計算,該函數等價於實現: F(X[0], pre), F(X[1], X[0]), ..., F(X[n], X[n-1])。對應腳本如下:
t2[`clean] = eachPre(cleanFun{F}, t2[`ln])
完整代碼如下:
F = 0.02
t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t
def cleanFun(F,x,y) : iif(abs(x) > F, y,x)
t2[`clean] = eachPre(cleanFun{F}, t2[`ln])
3.6 byRow 使用案例
計算矩陣每行最大值的下標。下例生成一個矩陣 m:
a1=2 3 4
a2=1 2 3
a3=1 4 5
a4=5 3 2
m = matrix(a1,a2,a3,a4)
一種思路是,對每行分別計算最大值的下標,可以直接調用 imax
函數實現。imax
在矩陣每列單獨計算,返回一個向量。
為求每行的計算結果,可以先對矩陣進行轉置操作,然後調用 imax
函數進行計算。
imax(m.transpose())
此外,DolphinDB 還提供了一個高階函數 byRow
,對矩陣的每一行應用指定函數進行計算。使用該函數可以避免轉置操作。
byRow(imax, m)
3.7 segmentby 使用案例
高階函數 segmentby
。其語法如下:
segmentby(func, funcArgs, segment)
根據 segment 參數取值確定分組方案,連續的相同值分為一組,進行分組計算。返回的結果與 segment 參數的長度相同。
x=1 2 3 0 3 2 1 4 5
y=1 1 1 -1 -1 -1 1 1 1
segmentby(cumsum,x,y);
上例中,根據 y 確定了 3 個分組:1 1 1, -1 -1 -1 和 1 1 1,由此把 x 也分為 3 組:1 2 3, 0 3 2 和 1 4 5,並將 cumsum 函數應用到 x 的每個分組,計算每個分組的累計和。
DolphinDB 還提供了內置函數 segment
用於在SQL語句中進行分組。與 segmentby 不同,它只返回分組信息,而不對分組進行計算。
下例中,將表的某列數據按照給定閾值進行分組,連續小於或大於該閾值的數據被劃分為一組。連續大於該閾值的分組將保留組內最大值對應的記錄並輸出(若有重複值則輸出第一條)。
表內容如下圖所示,當閾值為 0.3 時,希望結果保留箭頭所指記錄:
表定義如下:
dated = 2021.09.01..2021.09.12
v = 0 0 0.3 0.3 0 0.5 0.3 0.7 0 0 0.3 0
t = table(dated as date, v)
將數據按照是否連續大於 minV 來分組時,可以使用函數 segment
。
segment(v>= minV)
在 SQL 中配合 context by
語句進行分組計算,通過 having 子句過濾分組的最大值。過濾結果可能存在多行,根據需求只保留第一行滿足結果的數據,此時可以通過指定 limit 子句限定輸出的記錄數。
完整的 SQL 查詢語句如下:
select * from t context by segment(v>= minV) having (v=max(v) and v>=minV) limit 1
3.8 pivot 使用案例
高階函數 pivot
可以在指定的二維維度上重組數據,結果為一個矩陣。
現有包含 4 列數據的表 t1:
syms=`600300`600400`600500$SYMBOL
sym=syms[0 0 0 0 0 0 0 1 1 1 1 1 1 1 2 2 2 2 2 2 2]
time=09:40:00+1 30 65 90 130 185 195 10 40 90 140 160 190 200 5 45 80 140 170 190 210
price=172.12 170.32 172.25 172.55 175.1 174.85 174.5 36.45 36.15 36.3 35.9 36.5 37.15 36.9 40.1 40.2 40.25 40.15 40.1 40.05 39.95
volume=100 * 10 3 7 8 25 6 10 4 5 1 2 8 6 10 2 2 5 5 4 4 3
t1=table(sym, time, price, volume);
t1;
將 t1 的數據依據 time 和 sym 維度進行數據重組,並且計算每分鐘股價的加權平均值,以交易量為權重。
stockprice=pivot(wavg, [t1.price, t1.volume], minute(t1.time), t1.sym)
stockprice.round(2)
3.9 contextby 使用案例
高階函數 contextby
可以將數據根據列字段分組,並在組內調用指定函數進行計算。
sym=`IBM`IBM`IBM`MS`MS`MS
price=172.12 170.32 175.25 26.46 31.45 29.43
qty=5800 700 9000 6300 2100 5300
trade_date=2013.05.08 2013.05.06 2013.05.07 2013.05.08 2013.05.06 2013.05.07;
contextby(avg, price, sym);
contextby
亦可搭配 SQL 語句使用。下例調用 contextby 篩選出價格高於組內平均價的交易記錄:
t1=table(trade_date,sym,qty,price);
select trade_date, sym, qty, price from t1 where price > contextby(avg, price,sym);
3.10 call/unifiedCall 使用案例
對需要批量調用不同函數進行計算的場景,可以通過高階函數 call
或者 unifiedCall
配合高階函數 each
/loop
實現。
call
和unifiedCall
功能相同,但參數形式不同,詳情可參考用户手冊。
下例中在部分應用中調用了函數 call
函數,該部分應用將向量 [1, 2, 3] 作為固定參數,在高階函數 each
中調用函數 sin
與 log
。
each(call{, 1..3},(sin,log));
此外,還可通過元編程方式調用函數。這裏會用到funcByName
。上述例子可改寫為:
each(call{, 1..3},(funcByName('sin'),funcByName('log')));
或者,使用 makeCall
/makeUnifiedCall
生成元代碼,後續通過 eval
來執行:
each(eval, each(makeCall{,1..3},(sin,log)))
3.11 accumulate 使用案例
已知分鐘線數據如下,將某隻股票每成交約 150 萬股進行一次時間切分,最後得到時間窗口長度不等的若干條數據。具體的切分規則為:若某點的數據合入分組,可以縮小數據量和閾值(150 萬)間的差值,則加入該點,否則當前分組不合入該點的數據。示意圖如下:
構造測試數據如下:
timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)
這裏自定義一個分組計算函數,將 volume 的累加,按上述切分規則,以 150 萬為閾值進行分組。
先定義一個分組函數,如下:
def caclCumVol(target, preResult, x){
result = preResult + x
if(result - target> target - preResult) return x
else return result
}
accumulate(caclCumVol{1500000}, volume)
上述腳本通過自定義函數 caclCumVol 計算 volume 的累加值,結果最接近 150 萬時劃分分組。新的分組將從下一個 volume 值開始重新累加。對應腳本如下:
iif(accumulate(caclCumVol{1500000}, volume) ==volume, timex, NULL).ffill()
通過和 volume 比較,篩選出了每組的起始記錄。若中間結果存在空值,則調用 ffill 函數進行前值填充。將獲得的結果配合 group by 語句進行分組計算,查詢時,注意替換以上腳本的 timex 為表的 time 字段。
output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume) ==volume, time, NULL).ffill() as startTime
完整代碼如下:
timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)
def caclCumVol(target, preResult, x){
result = preResult + x
if(result - target> target - preResult) return x
else return result
}
output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume)==volume, time, NULL).ffill() as startTime
3.12 window 使用案例
對錶中的某列數據進行以下計算,如果當前數值是前 5 個數據的最低值 (包括當前值),也是後 5 個最低值 (包括當前值),那麼標記是 1,否則是 0。
創建測試表 t:
t = table(rand(1..100,20) as id)
可以通過應用窗口函數 window
,指定一個前後都為 5 的數據窗口,在該窗口內通過調用 min 函數計算最小值。注意:函數 window 的窗口邊界包含在窗口中。
實現腳本如下:
select *, iif(id==window(min, id, -4:4), 1, 0) as mid from t
3.13 reduce 使用案例
上面的一些案例中,也有用到高階函數 reduce
。偽代碼如下:
result=<function>(init,X[0]);
for(i:1~size(X)){
result=<function>(result, X[i]);
}
return result;
與 accumulate
返回中間結果不同,reduce
只返回最後一個結果。
例如下面的計算階乘的例子:
r1 = reduce(mul, 1..10);
r2 = accumulate(mul, 1..10)[9];
最終 r1 和 r2 的結果是一樣的。
4. 部分應用案例
部分應用是指固定一個函數的部分參數,產生一個參數較少的函數。部分應用通常應用在對參數個數有特定要求的高階函數中。
4.1 提交帶有參數的作業
假設需要一個 定時任務,每日 0 點執行,用於計算某設備前一日温度指標的最大值。
假設設備的温度信息存儲在分佈式庫 dfs://dolphindb
下的表 sensor
中,其時間字段為 ts
,類型為 DATETIME。下例定義一個 getMaxTemperature
函數來實現計算過程,腳本如下:
def getMaxTemperature(deviceID){
maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
where ID=deviceID ,date(ts) = today()-1
return maxTemp
}
定義計算函數後,可通過函數 scheduleJob
提交定時任務。由於函數 scheduleJob
不提供接口供任務函數進行傳參,而自定義函數 getMaxTemperature
以設備 deviceID
作為參數,這裏可以通過部分應用來固定參數,從而產生一個沒有參數的函數。腳本如下:
scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');
上例只查詢了設備號為 1 的設備。
最終,完整代碼如下:
def getMaxTemperature(deviceID){
maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
where ID=deviceID ,date(ts) = today()-1
return maxTemp
}
scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');
4.2 獲取集羣其它節點作業信息
在 DolphinDB 中提交定時作業後,可通過函數 getRecentJobs
來取得本地節點上最近幾個批處理作業的狀態。如查看本地節點最近 3 個批處理作業狀態,可以用如下所示腳本實現:
getRecentJobs(3);
若想獲取集羣上其它節點的作業信息,需通過函數 rpc
來在指定的遠程節點上調用內置函數 getRecentJobs
。如獲取節點別名為 P1-node1 的作業信息,可以如下實現:
rpc("P1-node1",getRecentJobs)
如需獲取節點 P1-node1 上最近 3 個作業的信息,通過如下腳本實現會報錯:
rpc("P1-node1",getRecentJobs(3))
因為 rpc
函數第二個參數需要為函數(內置函數或用户自定義函數)。這裏可以通過 DolphinDB 的部分應用,固定函數參數,來生成一個新的函數給 rpc
使用,如下:
rpc("P1-node1",getRecentJobs{3})
4.3 帶 “狀態” 的流計算消息處理函數
在流計算中,用户通常需要給定一個消息處理函數,接受到消息後進行處理。這個處理函數是一元函數或數據表。若為函數,用於處理訂閲數據,其唯一的參數是訂閲的數據,即不能包含狀態信息。
下例通過部分應用定義消息處理函數 cumulativeAverage
,用於計算數據的累計均值。
定義流表 trades,對於其 price
字段,每接受一條消息,計算一次 price
的均值,並輸出到結果表 avgTable 中。腳本如下:
share streamTable(10000:0,`time`symbol`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
avgT=table(10000:0,[`avg_price],[DOUBLE])
def cumulativeAverage(mutable avgTable, mutable stat, trade){
newVals = exec price from trade;
for(val in newVals) {
stat[0] = (stat[0] * stat[1] + val )/(stat[1] + 1)
stat[1] += 1
insert into avgTable values(stat[0])
}
}
subscribeTable(tableName="trades", actionName="action30", handler=cumulativeAverage{avgT,0.0 0.0}, msgAsTable=true)
自定義函數 cumulativeAverage
的參數 avgTable 為計算結果的存儲表。stat 是一個向量,包含了兩個值:其中,stat[0] 用來表示當前的所有數據的平均值,stat[1] 表示數據個數。函數體的計算實現為:遍歷數據更新 stat 的值,並將新的計算結果插入表。
訂閲流表時,通過在 handler 中固定前兩個參數,實現帶 “狀態” 的消息處理函數。
5. 金融場景相關案例
5.1 使用 map reduce,對 tick 數據降精度
下例中,使用 mr
函數(map reduce)將 tick 數據轉化為分鐘級數據。
在DolphinDB中,可以使用SQL語句基於 tick 數據計算分鐘級數據:
minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute
但在數據量較大時,該實現效率低,耗時長。為提升性能,可以使用 DolphinDB 的分佈式計算。
Map-Reduce 函數 mr
是 DolphinDB 通用分佈式計算框架的核心功能。
完整代碼如下:
login(`admin, `123456)
db = database("dfs://TAQ")
quotes = db.loadTable("quotes")
//create a new table quotes_minute
model=select top 1 symbol,date, minute(time) as minute,bid,ofr from quotes where date=2007.08.01,symbol=`EBAY
if(existsTable("dfs://TAQ", "quotes_minute"))
db.dropTable("quotes_minute")
db.createPartitionedTable(model, "quotes_minute", `date`symbol)
//populate data for table quotes_minute
def saveMinuteQuote(t){
minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute
loadTable("dfs://TAQ", "quotes_minute").append!(minuteQuotes)
return minuteQuotes.size()
}
ds = sqlDS(<select symbol,date,time,bid,ofr from quotes where date between 2007.08.01 : 2007.08.31>)
timer mr(ds, saveMinuteQuote, +)
5.2 數據回放和高頻因子計算
有狀態的因子,即因子的計算不僅用到當前數據,還會用到歷史數據。實現狀態因子的計算,一般包括這幾個步驟:
- 保存本批次的消息數據到歷史記錄;
- 根據更新後的歷史記錄,計算因子
- 將因子計算結果寫入輸出表中。如有必要,刪除未來不再需要的的歷史記錄。
DolphinDB 的消息處理函數必須是單目函數,其唯一的參數就是當前的消息。要保存歷史狀態並在消息處理函數中計算曆史數據,可以通過部分應用實現:對於多參數的消息處理函數,保留一個參數用於接收消息,固化其它所有的參數,用於保存歷史狀態。這些固化參數只對消息處理函數可見,不受其他應用的影響。
歷史狀態可保存在內存表,字典或分區內存表中。本例將使用 DolphinDB流計算引擎 來處理 報價數據 通過字典保存歷史狀態並計算因子。如需通過內存表或分佈式內存表保存歷史狀態,可以參考 實時計算高頻因子。
定義狀態因子:計算當前第一檔賣價 (askPrice1) 與 30 個報價前的第一檔賣價的比值。
對應的因子計算函數 factorAskPriceRatio
實現如下:
defg factorAskPriceRatio(x){
cnt = x.size()
if(cnt < 31) return double()
else return x[cnt - 1]/x[cnt - 31]
}
導入數據創建對應的流表後,可以通過 replay
函數回放數據,模擬實時流計算的場景。
quotesData = loadText("/data/ddb/data/sampleQuotes.csv")
x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
由於這裏使用字典保存歷史狀態,可以定義如下字典:
history = dict(STRING, ANY)
該字典的鍵值為 STRING 類型,值為元組(tuple)類型,存儲股票字段,值為元組(tuple)類型,存儲賣價的歷史數據。
下例調用 dictUpdate!
函數更新字典,然後循環計算每隻股票的因子,並通過表存儲因子的計算結果。然後訂閲流表,通過數據回放向流表注入數據,每到來一條新數據都將觸發因子的計算。
消息處理函數定義如下:
def factorHandler(mutable historyDict, mutable factors, msg){
historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
syms = msg.symbol.distinct()
cnt = syms.size()
v = array(DOUBLE, cnt)
for(i in 0:cnt){
v[i] = factorAskPriceRatio(historyDict[syms[i]])
}
factors.tableInsert([take(now(), cnt), syms, v])
}
參數 historyDict 為保存歷史狀態的字典,factors 是存儲計算結果的表。
完整代碼如下:
quotesData = loadText("/data/ddb/data/sampleQuotes.csv")
defg factorAskPriceRatio(x){
cnt = x.size()
if(cnt < 31) return double()
else return x[cnt - 1]/x[cnt - 31]
}
def factorHandler(mutable historyDict, mutable factors, msg){
historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
syms = msg.symbol.distinct()
cnt = syms.size()
v = array(DOUBLE, cnt)
for(i in 0:cnt){
v[i] = factorAskPriceRatio(historyDict[syms[i]])
}
factors.tableInsert([take(now(), cnt), syms, v])
}
x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
history = dict(STRING, ANY)
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName = "quotes1", offset=0, handler=factorHandler{history, factors}, msgAsTable=true, batchSize=3000, throttle=0.005)
replay(inputTables=quotesData, outputTables=quotes1, dateColumn=`date, timeColumn=`time)
查看結果
select top 10 * from factors where isValid(factor)
5.3 基於字典的計算
下例創建表 orders,該表包含了一些簡單的股票信息:
orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)
創建一個字典。鍵為股票代碼,值為從 orders 表中篩選出來的只包含該股票信息的子表。
字典定義如下:
historyDict = dict(STRING, ANY)
然後通過函數 dictUpdate!
,來更新每個鍵的值,實現如下:
historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders, initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})
可以把 dictUpdate!
的執行過程理解成,針對參數 parameters 遍歷,每個 parameters 作為參數,通過 function 去更新字典 (字典的 key 由 keys 指定的)。當字典中不存在對應的 key 時,會調用 initFunc 去初始化 key 對應的值。
這個例子中,字典的 key 是股票代碼,value 是 orders 的子表。
這裏,我們使用 orders.SecID 作為 keys,在更新的函數參數中,我們定義了一個 lamda 函數將當前記錄插入到表中,如下:
def(x,y){tableInsert(x,y);return x}
注意此處使用 lamda 函數封裝了 tableInsert
,而非指定 function=tableInsert。這是因為 tableInsert
的返回值不是一個 table,而是插入的條數,如果直接調用 tableInsert
,在寫入第二條 IBM 對應的記錄時,會將字典中的值更新成插入的條數;寫入第三條 IBM 對應的記錄時,系統會拋出異常。
初始條件下,historyDict 未賦值,可以通過指定 initFunc 參數對字典進行初始化賦值:
def(x){
t = table(100:0, x.keys(), each(type, x.values()));
tableInsert(t, x);
return t
}
最終,完整代碼如下:
orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)
historyDict = dict(STRING, ANY)
historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders,
initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})
執行後 historyDict 結果如下:
GOOG->
Vol Value SecID
--- ----- -----
7 4 GOOG
IBM->
Vol Value SecID
--- ----- -----
4 1 IBM
5 2 IBM
6 3 IBM
6. 機器學習相關案例
6.1 ols 殘差
創建樣本表 t 如下:
t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)
計算每行數據和一個向量 benchX 的迴歸殘差,並將結果保存到新列中。
向量 benchX 如下:
benchX = 10 15 7 8 9 1 2.0
DolphinDB 提供了最小二乘迴歸函數 ols
。
先將表中參與計算的字段轉化成矩陣:
mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()
然後定義殘差計算函數如下:
def(y, x) {
return ols(y, x, true, 2).ANOVA.SS[1]
}
最後使用高階函數 each
與部分應用,對每行數據應用殘差計算函數:
t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)
完整代碼如下:
t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)
mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()
t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)
7. 總結
除了上面提到的一些函數與高階函數。DolphinDB 還提供了豐富的 函數庫,包括數學函數、統計函數、分佈相關函數、假設檢驗函數、機器學習函數、邏輯函數、字符串函數、時間函數、數據操作函數、窗口函數、高階函數、元編程、分佈式計算函數、流計算函數、定時任務函數、性能監控函數、用户權限管理函數等。
- 如何使用VS2017編譯DolphinDB C API動態庫
- DolphinDB 函數化編程案例教程
- DolphinDB 版本兼容性標準
- 更強大、更靈活、更全面丨一文搞懂DolphinDB窗口計算
- 從一次 SQL 查詢的全過程看 DolphinDB 的線程模型
- 支持事務,還是不支持事務?這是一個問題
- DolphinDB 用户社區「AskDolphinDB」正式上線!!
- DolphinDB案例分享丨帆軟報表軟件如何連接DolphinDB數據源
- 乾貨丨時序數據庫DolphinDB與Spark的性能對比測試報吿
- 測試報吿丨DolphinDB與Elasticserach在金融數據集上的性能對比測試
- 測試報吿丨DolphinDB與Pandas對於大文本文件處理的性能對比
- DolphinDB如何實現EMQ X數據接入
- DolphinDB模塊複用教程
- DolphinDB的權限管理和安全
- DolphinDB內存管理詳解
- DolphinDB通用計算教程
- DolphinDB腳本語言的混合範式編程
- DolphinDB流計算引擎實現傳感器數據異常檢測
- DolphinDB內存表詳解
- 乾貨丨DolphinDB文本數據加載教程