DolphinDB 資料遷移與再平衡
資料遷移再平衡的目標是保證分割槽副本儘可能均衡分佈,副本位置影響著 IO 效能、節點負載,對於資料訪問延遲有著較大的影響。近來,越來越多的客戶對於資料容量或計算效能提出了更高要求,因而進行了叢集擴充套件,擴充套件之後如何在新的拓撲之中平衡資料,這是客戶必然面臨的問題。為此,我們撰寫了本文,總結資料遷移再平衡的常見場景與方法。
本教程適用於1.30.17、2.00.5及以上版本。
1 概述
DolphinDB 叢集支援擴充套件節點與儲存,以便增強計算能力與資料容量,那麼新增的節點與磁碟是沒有儲存任何資料的,所有資料依舊儲存在舊的節點與磁碟。也就是說,不同節點或不同磁碟上的資料分佈可能會存在較為嚴重的傾斜,這樣可能會帶來幾個問題:
分散式查詢計運算元任務無法分配到新增的節點,導致該節點計算資源無法被充分利用;
舊的磁碟 IO 壓力過大,新增的磁碟 IO 無法被充分利用。
再者,資料依舊儲存在舊的磁碟,如果舊的資料庫分割槽存在資料寫入或更新,那麼舊的磁碟空間佔滿的機率相對較大,一旦滿了會引發多種問題。
所以,節點與儲存變更之後的資料遷移與平衡便顯得非常必要。後面章節中,將會重點介紹節點與磁碟變更之後的資料遷移與平衡。
2 環境配置與資料模擬
2.1 硬體配置
測試總共使用三臺配置相同的伺服器,分別命名為 P1、P2、P3,具體硬體配置如表所示。
處理器 |
核數 |
記憶體 |
作業系統 |
硬碟 |
Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz |
64 |
512 GB |
CentOS Linux release 7.9 |
SSD |
2.2 叢集配置
DolphinDB Server版本為2.00.9。
基於兩臺伺服器搭建雙副本普通叢集,假設兩臺伺服器名稱分別為 P1、P2,P1 部署一個控制節點、一個代理節點、一個數據節點,P2部署一個代理節點、一個數據節點,如表所示。
伺服器 |
IP |
埠 |
節點別名 |
節點型別 |
P1 |
192.168.100.4X |
8110 |
ctl1 |
控制節點 |
P1 |
192.168.100.4X |
8111 |
P1-agent |
代理節點 |
P1 |
192.168.100.4X |
8112 |
P1-dn1 |
資料節點 |
P2 |
192.168.100.4X |
8111 |
P2-agent |
代理節點 |
P2 |
192.168.100.4X |
8112 |
P2-dn1 |
資料節點 |
節點別名中,ctl 為英文單詞 controller 的縮寫,dn 為英文單詞 datanode 的縮寫。詳細配置情況請參照附件。
2.3 模擬資料
模擬資料為上交所 Level-1 快照資料,基於真實資料結構模擬2000只股票快照資料,基於 OLAP 與 TSDB 儲存引擎的建庫建表、資料模擬、資料插入指令碼如下:
model = table(1:0, `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`Volume`Amount`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferQty1`OfferQty2`OfferQty3`OfferQty4`OfferQty5, [SYMBOL, DATETIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG, LONG, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG, LONG, LONG])
// OLAP儲存引擎建庫建表
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSecurityID = database("", HASH, [SYMBOL, 10])
db = database("dfs://Level1", COMPO, [dbDate, dbSecurityID])
createPartitionedTable(db, model, `Snapshot, `DateTime`SecurityID)
// TSDB儲存引擎建庫建表
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 10])
db = database("dfs://Level1_TSDB", COMPO, [dbDate, dbSymbol], engine="TSDB")
createPartitionedTable(db, model, `Snapshot, `DateTime`SecurityID, sortColumns=`SecurityID`DateTime)
def mockHalfDayData(Date, StartTime) {
t_SecurityID = table(format(600001..602000, "000000") + ".SH" as SecurityID)
t_DateTime = table(concatDateTime(Date, StartTime + 1..2400 * 3) as DateTime)
t = cj(t_SecurityID, t_DateTime)
size = t.size()
return table(t.SecurityID as SecurityID, t.DateTime as DateTime, rand(100.0, size) as PreClosePx, rand(100.0, size) as OpenPx, rand(100.0, size) as HighPx, rand(100.0, size) as LowPx, rand(100.0, size) as LastPx, rand(10000, size) as Volume, rand(100000.0, size) as Amount, rand(100.0, size) as BidPrice1, rand(100.0, size) as BidPrice2, rand(100.0, size) as BidPrice3, rand(100.0, size) as BidPrice4, rand(100.0, size) as BidPrice5, rand(100000, size) as BidOrderQty1, rand(100000, size) as BidOrderQty2, rand(100000, size) as BidOrderQty3, rand(100000, size) as BidOrderQty4, rand(100000, size) as BidOrderQty5, rand(100.0, size) as OfferPrice1, rand(100.0, size) as OfferPrice2, rand(100.0, size) as OfferPrice3, rand(100.0, size) as OfferPrice4, rand(100.0, size) as OfferPrice5, rand(100000, size) as OfferQty1, rand(100000, size) as OfferQty2, rand(100000, size) as OfferQty3, rand(100000, size) as OfferQty4, rand(100000, size) as OfferQty5)
}
def mockData(DateVector, StartTimeVector) {
for(Date in DateVector) {
for(StartTime in StartTimeVector) {
data = mockHalfDayData(Date, StartTime)
// OLAP儲存引擎分散式表插入模擬資料
loadTable("dfs://Level1", "Snapshot").append!(data)
// TSDB儲存引擎分散式表插入模擬資料
loadTable("dfs://Level1_TSDB", "Snapshot").append!(data)
}
}
}
mockData(2020.06.01..2020.06.10, 09:30:00 13:00:00)
上述指令碼中,我們模擬了2020.06.01至2020.06.10共計十天的資料。基於 OLAP 與 TSDB 兩種儲存引擎的分割槽方案是一致的,均採用兩級分割槽,分割槽粒度為表級分割槽,一級按天作了 VALUE 分割槽,二級按 SYMBOL 型別作了10個 HASH 分割槽。各個資料節點分割槽數量統計如下:
select count(*) from pnodeRun(getAllChunks) where dfsPath like "/Level1%" group by site, type
返回結果:
site |
type |
count |
P1-dn1 |
0 |
4 |
P1-dn1 |
1 |
200 |
P2-dn1 |
0 |
4 |
P2-dn1 |
1 |
200 |
其中,type 欄位表示分割槽類別,0表示 File Chunk,1表示 Tablet Chunk。File Chunk 包含 domain、
.tbl 等檔案,一個庫對應一個 domain 檔案,一個表對應一個 tbl 檔案,儲存庫表結構相關的資訊;真正儲存資料的區塊我們稱之為 Tablet Chunk。因為配置了雙副本,且在資源允許情況下,副本優先部署在多臺物理伺服器,所以上述結果符合預期,兩個庫共計包含408個分割槽。
2.4 注意事項
由於資料遷移再平衡任務比較耗費物理資源,且對於正在寫入、修改、刪除操作的分割槽,由於分割槽鎖的佔用,會出現遷移失敗的情況;對於耗時較長的查詢計算任務,由於快取指向舊的分割槽路徑,可能會出現中途丟擲異常。這些屬於預期內的正常現象。
建議
無寫入、無查詢任務執行時進行資料遷移再平衡操作。
3 函式介紹
本教程使用到的再平衡與資料遷移相關函式介紹如下,以下所有函式均須在控制節點執行,僅管理員使用者擁有許可權。
3.1 rebalanceChunksAmongDataNodes
函式定義
rebalanceChunksAmongDataNodes([exec=false])
功能描述
叢集之中所有資料節點之間的分割槽再平衡。
引數含義
exec:預設值為 false,僅根據再平衡演算法預估分割槽遷移結果,並不真正執行;為 true,才會執行遷移。
平衡演算法
平衡演算法存在一些假定條件,諸如:
舊盤所有已使用空間均是被分割槽資料佔用;
每個分割槽大小一致,而實際可能存在偏差;
新盤所有空間均是用來儲存分割槽資料用的;
實際情況與假定條件存在一定的誤差,如:磁碟除了存放 DolphinDB 資料外存了不少其它內容,分割槽大小傾斜嚴重等,可能會導致再平衡結果不夠理想。初次執行再平衡後,舊盤、新盤磁碟空間、分割槽大小等均發生了變化,多次執行再平衡,可以在一定程度上優化再平衡效果。
3.2 rebalanceChunksWithinDataNode
函式定義
rebalanceChunksWithinDataNode(nodeAlias, [exec=false])
功能描述
叢集之中某個資料節點內部、不同磁碟之間的資料再平衡。
引數含義
nodeAlias:字串,表示節點別名。
exec:預設值為 false,僅根據再平衡演算法預估分割槽遷移結果,並不真正執行;為 true,才會執行遷移。
平衡演算法
同上一小節。
3.3 restoreDislocatedTablet
函式定義
restoreDislocatedTablet()
功能描述
當配置分割槽粒度為表級分割槽時,同一個分割槽的所有表將分佈在相同的節點下。當呼叫函式 rebalanceChunksAmongDataNodes 進行資料平衡時,若出現節點宕機或離線,可能出現同一個分割槽裡部分表的資料轉移成功,部分表的資料轉移失敗的情況,即同一個分割槽下的不同表會分佈在不同的節點。該函式可以修復此問題,將同一個分割槽裡的錶轉移到同一個節點下。
3.4 moveReplicas
函式定義
moveReplicas(srcNode, destNode, chunkId)
功能描述
將源節點上一個或多個分割槽副本遷移至目標節點。
引數含義
srcNode:字串,表示源節點別名。
destNode:字串,表示目標節點別名。
chunkId:字串/UUID 標量或向量,表示分割槽 ID。
3.5 moveChunksAcrossVolume
函式定義
moveChunksAcrossVolume(srcPath, destPath, chunkIds, [isDelSrc=true])
功能描述
將磁碟卷源路徑下一個或多個分割槽檔案轉移至目標路徑。
引數含義
srcPath:源分割槽檔案路徑。
destPath:目標分割槽檔案路徑。
chunkIds:分割槽 ID。
isDelSrc:拷貝成功後是否刪除源分割槽。
4 節點變更場景下的資料遷移
本章節針對節點擴容、縮容場景下的分割槽平衡或資料遷移以示例的形式展開介紹。
4.1 節點擴容
增加一臺伺服器,名稱為 P3,部署一個代理節點、一個數據節點,如表所示。
伺服器 |
IP |
埠 |
節點別名 |
節點型別 |
P3 |
192.168.100.4X |
8111 |
P3-agent |
代理節點 |
P3 |
192.168.100.4X |
8112 |
P3-dn1 |
資料節點 |
首先,在資料節點上執行以下命令:
rpc(getControllerAlias(), rebalanceChunksAmongDataNodes{ false })
返回預估分割槽遷移結果,如圖所示,預估 P1-dn1、P2-dn1 共有114個分割槽遷移至 P3-dn1。
然後,在資料節點上執行以下命令,真正地執行分割槽再平衡。
rpc(getControllerAlias(), rebalanceChunksAmongDataNodes{ true })
在資料節點上執行以下命令,檢視任務併發度與再平衡任務執行進度。
rpc(getControllerAlias(), getConfigure{ `dfsRebalanceConcurrency })
rpc(getControllerAlias(), getRecoveryTaskStatus)
任務執行進度如圖所示。可以看到,DeleteSource 欄位全部為 True,原因為對於資料節點之間分割槽再平衡,從源資料節點到目的資料節點複製完成後,該引數會被置為 True,源資料節點會刪除相應副本資訊。
Status欄位為 In-Progress 的任務數目表示控制節點發起任務時的併發度,通過 dfsRebalanceConcurrency 引數配置,預設為資料節點個數的兩倍。recoveryWorkers 引數表示資料節點執行任務時的併發度,預設為資料節點個數的兩倍。
Status 欄位為 Finished,表示該任務已經完成。等待所有任務執行完畢後,在資料節點上執行以下命令,檢視再平衡後各個資料節點分割槽數量統計。
select count(*) from pnodeRun(getAllChunks) group by site
返回結果:
site |
count |
P1-dn1 |
196 |
P2-dn1 |
98 |
P3-dn1 |
114 |
4.2 節點擴容時資料遷移效能
在資料節點上執行以下命令,統計再平衡整體耗時。
select max(FinishTime - StartTime) as maxDiffTime from rpc(getControllerAlias(), getRecoveryTaskStatus)
返回結果:89秒。114個分割槽佔用磁碟空間約17GB,節點之間再平衡時資料遷移速度約為200MB/s。
場景 |
是否跨伺服器 |
網路 |
硬碟 |
任務發起併發度 |
任務執行併發度 |
遷移速率(MB/s) |
節點之間分割槽遷移 |
是 |
萬兆 |
SSD |
6 |
6 |
200 |
4.3 節點縮容
我們將縮減上一小節中增加的伺服器 P3,縮減之前,需要將 P3-dn1 資料節點上的分割槽副本遷移至叢集中其它資料節點上。
自定義 moveChunks 函式,入參為一個源節點名稱,內部實現依賴 moveReplicas 函式,遷移源節點上所有分割槽副本至其它節點上。在資料節點上執行以下命令,執行遷移操作。
def moveChunks(srcNode) {
chunks = exec chunkId from pnodeRun(getAllChunks) where site = srcNode
allNodes = pnodeRun(getNodeAlias).node
for(chunk in chunks) {
destNode = allNodes[at(not allNodes in (exec site from pnodeRun(getAllChunks) where chunkId = chunk))].rand(1)[0]
print("From " + srcNode + ", to " + destNode + ", moving " + chunk)
rpc(getControllerAlias(), moveReplicas, srcNode, destNode, chunk)
}
}
srcNode = "P3-dn1"
moveChunks(srcNode)
在資料節點上執行以下命令,檢視遷移進度。
rpc(getControllerAlias(), getRecoveryTaskStatus)
等待所有任務執行完畢後,在資料節點上執行以下命令,檢視遷移後各個資料節點分割槽數量統計。
select count(*) from pnodeRun(getAllChunks) group by site
返回結果:
site |
count |
P1-dn1 |
204 |
P2-dn1 |
204 |
此時,P3-dn1 資料節點上已不包含任何分割槽副本,可以進行節點縮減。
5 磁碟變更場景下的資料遷移
本章節針對磁碟擴容、縮容場景下的分割槽平衡或資料遷移以示例的形式展開介紹。
5.1 磁碟擴容
對於每個資料節點增加一塊磁碟。配置檔案中 volumes 引數由
volumes=/ssd/ssd0/jtwang/chunkData
更改為:
volumes=/ssd/ssd0/jtwang/chunkData,/ssd/ssd1/jtwang/chunkData
在資料節點上執行以下命令,對於 P1-dn1 資料節點上各塊磁碟之間進行分割槽再平衡。
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P1-dn1", false })
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P1-dn1", true })
在資料節點上執行以下命令,檢視遷移進度。
rpc(getControllerAlias(), getRecoveryTaskStatus)
同樣地,在資料節點上執行以下命令,對於 P2-dn1 資料節點上各塊磁碟之間進行分割槽再平衡。
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P2-dn1", false })
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P2-dn1", true })
在資料節點上執行以下命令,檢視遷移進度。
rpc(getControllerAlias(), getRecoveryTaskStatus)
從圖中看,兩次再平衡共計遷移189個分割槽。可以看到,DeleteSource 欄位全部為 False,原因為對於資料節點內部分割槽遷移,從舊盤到新盤,分割槽仍然是在同一個資料節點,所以該引數一直為 False。
等待所有任務執行完畢後,在資料節點上執行以下命令,檢視遷移後各個資料節點各塊磁碟的分割槽數量統計。
def getDiskNo(path) {
size = path.size()
result = array(STRING, 0, size)
for(i in 0 : size) { append!(result, concat(split(path[i], "/")[0:5], "/")) }
return result
}
select count(*) from pnodeRun(getAllChunks) group by site, getDiskNo(path) as disk
返回結果:
site |
disk |
count |
P1-dn1 |
/ssd/ssd0/jtwang/chunkData |
108 |
P1-dn1 |
/ssd/ssd1/jtwang/chunkData |
96 |
P2-dn1 |
/ssd/ssd0/jtwang/chunkData |
111 |
P2-dn1 |
/ssd/ssd1/jtwang/chunkData |
93 |
5.2 磁碟擴容時資料遷移效能
P1-dn1 資料節點上各塊磁碟之間分割槽再平衡之後,在資料節點上執行以下命令,統計再平衡整體耗時。
select max(FinishTime - StartTime) as maxDiffTime from rpc(getControllerAlias(), getRecoveryTaskStatus)
返回結果:56秒。96個分割槽佔用磁碟空間約14GB,節點內部再平衡時資料遷移速度約為250MB/s。
場景 |
是否跨伺服器 |
網路 |
硬碟 |
任務發起併發度 |
任務執行併發度 |
遷移速率(MB/s) |
磁碟之間分割槽遷移 |
否 |
萬兆 |
SSD |
4 |
4 |
250 |
5.3 磁碟縮容
我們將縮減上一小節中增加的磁碟,縮減之前,需要將各個資料節點新增的磁碟上的分割槽副本遷移至其它磁碟。
在資料節點上執行以下命令,將 P1-dn1、P2-dn1 資料節點 ssd1 盤上的分割槽遷移至 ssd0 盤。
srcPath = "/ssd/ssd1/jtwang/chunkData/CHUNKS"
destPath = "/ssd/ssd0/jtwang/chunkData/CHUNKS"
node = "P1-dn1"
chunkIds = exec chunkId from pnodeRun(getAllChunks) where site = node, path like (srcPath + "%")
rpc(node, moveChunksAcrossVolume{ srcPath, destPath, chunkIds, isDelSrc=true })
node = "P2-dn1"
chunkIds = exec chunkId from pnodeRun(getAllChunks) where site = node, path like (srcPath + "%")
rpc(node, moveChunksAcrossVolume{ srcPath, destPath, chunkIds, isDelSrc=true })
在資料節點上執行以下命令,檢視遷移後各個資料節點各塊磁碟的分割槽數量統計。
select count(*) from pnodeRun(getAllChunks) group by site, getDiskNo(path) as disk
返回結果:
site |
disk |
count |
P1-dn1 |
/ssd/ssd0/jtwang/chunkData |
204 |
P2-dn1 |
/ssd/ssd0/jtwang/chunkData |
204 |
此時,各個資料節點 ssd1 盤上已不包含任何分割槽副本,可以進行磁碟縮容。
6 小結
節點與磁碟的擴容和縮容,是生產環境中比較常見的場景。通過對此類場景的模擬和解決方案的實現,我們展示了 DolphinDB 強大、便捷的資料遷移與平衡能力,解決了叢集擴充套件之後如何優化資源使用的問題。
附件
- DolphinDB 流計算應用:引擎級聯監測門禁異常狀態
- DolphinDB 資料遷移與再平衡
- DolphinDB 機器學習在物聯網行業的應用:實時資料異常率預警
- 如何提升 ETF 期權隱含波動率和希臘值的計算速度?
- 如何用 Prometheus 和 Grafana 實現叢集的監控預警
- DolphinDB 分散式表資料更新原理和效能介紹
- 時序資料庫 DolphinDB 執行緒簡介
- 中高頻多因子庫儲存最佳實踐
- 如何高效處理面板資料
- 效能提升30倍丨基於 DolphinDB 的 mytt 指標庫實現
- 如何使用VS2017編譯DolphinDB C API動態庫
- DolphinDB 函式化程式設計案例教程
- DolphinDB 版本相容性標準
- 更強大、更靈活、更全面丨一文搞懂DolphinDB視窗計算
- 從一次 SQL 查詢的全過程看 DolphinDB 的執行緒模型
- 支援事務,還是不支援事務?這是一個問題
- DolphinDB 使用者社群「AskDolphinDB」正式上線!!
- DolphinDB案例分享丨帆軟報表軟體如何連線DolphinDB資料來源
- 乾貨丨時序資料庫DolphinDB與Spark的效能對比測試報告
- 測試報告丨DolphinDB與Elasticserach在金融資料集上的效能對比測試