DolphinDB 資料遷移與再平衡

語言: CN / TW / HK

資料遷移再平衡的目標是保證分割槽副本儘可能均衡分佈,副本位置影響著 IO 效能、節點負載,對於資料訪問延遲有著較大的影響。近來,越來越多的客戶對於資料容量或計算效能提出了更高要求,因而進行了叢集擴充套件,擴充套件之後如何在新的拓撲之中平衡資料,這是客戶必然面臨的問題。為此,我們撰寫了本文,總結資料遷移再平衡的常見場景與方法。

本教程適用於1.30.17、2.00.5及以上版本。

1 概述

DolphinDB 叢集支援擴充套件節點與儲存,以便增強計算能力與資料容量,那麼新增的節點與磁碟是沒有儲存任何資料的,所有資料依舊儲存在舊的節點與磁碟。也就是說,不同節點或不同磁碟上的資料分佈可能會存在較為嚴重的傾斜,這樣可能會帶來幾個問題:

  • 分散式查詢計運算元任務無法分配到新增的節點,導致該節點計算資源無法被充分利用;

  • 舊的磁碟 IO 壓力過大,新增的磁碟 IO 無法被充分利用。

再者,資料依舊儲存在舊的磁碟,如果舊的資料庫分割槽存在資料寫入或更新,那麼舊的磁碟空間佔滿的機率相對較大,一旦滿了會引發多種問題。

所以,節點與儲存變更之後的資料遷移與平衡便顯得非常必要。後面章節中,將會重點介紹節點與磁碟變更之後的資料遷移與平衡。

2 環境配置與資料模擬

2.1 硬體配置

測試總共使用三臺配置相同的伺服器,分別命名為 P1、P2、P3,具體硬體配置如表所示。

處理器

核數

記憶體

作業系統

硬碟

Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz

64

512 GB

CentOS Linux release 7.9

SSD

2.2 叢集配置

DolphinDB Server版本為2.00.9。

基於兩臺伺服器搭建雙副本普通叢集,假設兩臺伺服器名稱分別為 P1、P2,P1 部署一個控制節點、一個代理節點、一個數據節點,P2部署一個代理節點、一個數據節點,如表所示。

伺服器

IP

節點別名

節點型別

P1

192.168.100.4X

8110

ctl1

控制節點

P1

192.168.100.4X

8111

P1-agent

代理節點

P1

192.168.100.4X

8112

P1-dn1

資料節點

P2

192.168.100.4X

8111

P2-agent

代理節點

P2

192.168.100.4X

8112

P2-dn1

資料節點

節點別名中,ctl 為英文單詞 controller 的縮寫,dn 為英文單詞 datanode 的縮寫。詳細配置情況請參照附件。

2.3 模擬資料

模擬資料為上交所 Level-1 快照資料,基於真實資料結構模擬2000只股票快照資料,基於 OLAP 與 TSDB 儲存引擎的建庫建表、資料模擬、資料插入指令碼如下:

model = table(1:0, `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`Volume`Amount`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferQty1`OfferQty2`OfferQty3`OfferQty4`OfferQty5, [SYMBOL, DATETIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG, LONG, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG, LONG, LONG])

// OLAP儲存引擎建庫建表
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSecurityID = database("", HASH, [SYMBOL, 10])
db = database("dfs://Level1", COMPO, [dbDate, dbSecurityID])
createPartitionedTable(db, model, `Snapshot, `DateTime`SecurityID)

// TSDB儲存引擎建庫建表
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 10])
db = database("dfs://Level1_TSDB", COMPO, [dbDate, dbSymbol], engine="TSDB")
createPartitionedTable(db, model, `Snapshot, `DateTime`SecurityID, sortColumns=`SecurityID`DateTime)

def mockHalfDayData(Date, StartTime) {
    t_SecurityID = table(format(600001..602000, "000000") + ".SH" as SecurityID)
    t_DateTime = table(concatDateTime(Date, StartTime + 1..2400 * 3) as DateTime)
    t = cj(t_SecurityID, t_DateTime)
    size = t.size()
    return  table(t.SecurityID as SecurityID, t.DateTime as DateTime, rand(100.0, size) as PreClosePx, rand(100.0, size) as OpenPx, rand(100.0, size) as HighPx, rand(100.0, size) as LowPx, rand(100.0, size) as LastPx, rand(10000, size) as Volume, rand(100000.0, size) as Amount, rand(100.0, size) as BidPrice1, rand(100.0, size) as BidPrice2, rand(100.0, size) as BidPrice3, rand(100.0, size) as BidPrice4, rand(100.0, size) as BidPrice5, rand(100000, size) as BidOrderQty1, rand(100000, size) as BidOrderQty2, rand(100000, size) as BidOrderQty3, rand(100000, size) as BidOrderQty4, rand(100000, size) as BidOrderQty5, rand(100.0, size) as OfferPrice1, rand(100.0, size) as OfferPrice2, rand(100.0, size) as OfferPrice3, rand(100.0, size) as OfferPrice4, rand(100.0, size) as OfferPrice5, rand(100000, size) as OfferQty1, rand(100000, size) as OfferQty2, rand(100000, size) as OfferQty3, rand(100000, size) as OfferQty4, rand(100000, size) as OfferQty5)
}

def mockData(DateVector, StartTimeVector) {
    for(Date in DateVector) {
        for(StartTime in StartTimeVector) {
            data = mockHalfDayData(Date, StartTime)
 
            // OLAP儲存引擎分散式表插入模擬資料
            loadTable("dfs://Level1", "Snapshot").append!(data)
  
            // TSDB儲存引擎分散式表插入模擬資料
            loadTable("dfs://Level1_TSDB", "Snapshot").append!(data)
        }   
    }
}

mockData(2020.06.01..2020.06.10, 09:30:00 13:00:00)

上述指令碼中,我們模擬了2020.06.01至2020.06.10共計十天的資料。基於 OLAP 與 TSDB 兩種儲存引擎的分割槽方案是一致的,均採用兩級分割槽,分割槽粒度為表級分割槽,一級按天作了 VALUE 分割槽,二級按 SYMBOL 型別作了10個 HASH 分割槽。各個資料節點分割槽數量統計如下:

select count(*) from pnodeRun(getAllChunks) where dfsPath like "/Level1%" group by site, type

返回結果:

site

type

count

P1-dn1

0

4

P1-dn1

1

200

P2-dn1

0

4

P2-dn1

1

200

其中,type 欄位表示分割槽類別,0表示 File Chunk,1表示 Tablet Chunk。File Chunk 包含 domain、

.tbl 等檔案,一個庫對應一個 domain 檔案,一個表對應一個 tbl 檔案,儲存庫表結構相關的資訊;真正儲存資料的區塊我們稱之為 Tablet Chunk。因為配置了雙副本,且在資源允許情況下,副本優先部署在多臺物理伺服器,所以上述結果符合預期,兩個庫共計包含408個分割槽。

2.4 注意事項

由於資料遷移再平衡任務比較耗費物理資源,且對於正在寫入、修改、刪除操作的分割槽,由於分割槽鎖的佔用,會出現遷移失敗的情況;對於耗時較長的查詢計算任務,由於快取指向舊的分割槽路徑,可能會出現中途丟擲異常。這些屬於預期內的正常現象。

建議

無寫入、無查詢任務執行時進行資料遷移再平衡操作。

3 函式介紹

本教程使用到的再平衡與資料遷移相關函式介紹如下,以下所有函式均須在控制節點執行,僅管理員使用者擁有許可權。

3.1 rebalanceChunksAmongDataNodes

函式定義

rebalanceChunksAmongDataNodes([exec=false])

功能描述

叢集之中所有資料節點之間的分割槽再平衡。

引數含義

exec:預設值為 false,僅根據再平衡演算法預估分割槽遷移結果,並不真正執行;為 true,才會執行遷移。

平衡演算法

平衡演算法存在一些假定條件,諸如:

  • 舊盤所有已使用空間均是被分割槽資料佔用;

  • 每個分割槽大小一致,而實際可能存在偏差;

  • 新盤所有空間均是用來儲存分割槽資料用的;

實際情況與假定條件存在一定的誤差,如:磁碟除了存放 DolphinDB 資料外存了不少其它內容,分割槽大小傾斜嚴重等,可能會導致再平衡結果不夠理想。初次執行再平衡後,舊盤、新盤磁碟空間、分割槽大小等均發生了變化,多次執行再平衡,可以在一定程度上優化再平衡效果。

3.2 rebalanceChunksWithinDataNode

函式定義

rebalanceChunksWithinDataNode(nodeAlias, [exec=false])

功能描述

叢集之中某個資料節點內部、不同磁碟之間的資料再平衡。

引數含義

nodeAlias:字串,表示節點別名。

exec:預設值為 false,僅根據再平衡演算法預估分割槽遷移結果,並不真正執行;為 true,才會執行遷移。

平衡演算法

同上一小節。

3.3 restoreDislocatedTablet

函式定義

restoreDislocatedTablet()

功能描述

當配置分割槽粒度為表級分割槽時,同一個分割槽的所有表將分佈在相同的節點下。當呼叫函式 rebalanceChunksAmongDataNodes 進行資料平衡時,若出現節點宕機或離線,可能出現同一個分割槽裡部分表的資料轉移成功,部分表的資料轉移失敗的情況,即同一個分割槽下的不同表會分佈在不同的節點。該函式可以修復此問題,將同一個分割槽裡的錶轉移到同一個節點下。

3.4 moveReplicas

函式定義

moveReplicas(srcNode, destNode, chunkId)

功能描述

將源節點上一個或多個分割槽副本遷移至目標節點。

引數含義

srcNode:字串,表示源節點別名。

destNode:字串,表示目標節點別名。

chunkId:字串/UUID 標量或向量,表示分割槽 ID。

3.5 moveChunksAcrossVolume

函式定義

moveChunksAcrossVolume(srcPath, destPath, chunkIds, [isDelSrc=true])

功能描述

將磁碟卷源路徑下一個或多個分割槽檔案轉移至目標路徑。

引數含義

srcPath:源分割槽檔案路徑。

destPath:目標分割槽檔案路徑。

chunkIds:分割槽 ID。

isDelSrc:拷貝成功後是否刪除源分割槽。

4 節點變更場景下的資料遷移

本章節針對節點擴容、縮容場景下的分割槽平衡或資料遷移以示例的形式展開介紹。

4.1 節點擴容

增加一臺伺服器,名稱為 P3,部署一個代理節點、一個數據節點,如表所示。

伺服器

IP

節點別名

節點型別

P3

192.168.100.4X

8111

P3-agent

代理節點

P3

192.168.100.4X

8112

P3-dn1

資料節點

首先,在資料節點上執行以下命令:

rpc(getControllerAlias(), rebalanceChunksAmongDataNodes{ false })

返回預估分割槽遷移結果,如圖所示,預估 P1-dn1、P2-dn1 共有114個分割槽遷移至 P3-dn1。

然後,在資料節點上執行以下命令,真正地執行分割槽再平衡。

rpc(getControllerAlias(), rebalanceChunksAmongDataNodes{ true })

在資料節點上執行以下命令,檢視任務併發度與再平衡任務執行進度。

rpc(getControllerAlias(), getConfigure{ `dfsRebalanceConcurrency })
rpc(getControllerAlias(), getRecoveryTaskStatus)

任務執行進度如圖所示。可以看到,DeleteSource 欄位全部為 True,原因為對於資料節點之間分割槽再平衡,從源資料節點到目的資料節點複製完成後,該引數會被置為 True,源資料節點會刪除相應副本資訊。

Status欄位為 In-Progress 的任務數目表示控制節點發起任務時的併發度,通過 dfsRebalanceConcurrency 引數配置,預設為資料節點個數的兩倍。recoveryWorkers 引數表示資料節點執行任務時的併發度,預設為資料節點個數的兩倍。

Status 欄位為 Finished,表示該任務已經完成。等待所有任務執行完畢後,在資料節點上執行以下命令,檢視再平衡後各個資料節點分割槽數量統計。

select count(*) from pnodeRun(getAllChunks) group by site

返回結果:

site

count

P1-dn1

196

P2-dn1

98

P3-dn1

114

4.2 節點擴容時資料遷移效能

在資料節點上執行以下命令,統計再平衡整體耗時。

select max(FinishTime - StartTime) as maxDiffTime from rpc(getControllerAlias(), getRecoveryTaskStatus)

返回結果:89秒。114個分割槽佔用磁碟空間約17GB,節點之間再平衡時資料遷移速度約為200MB/s。

場景

是否跨伺服器

網路

硬碟

任務發起併發度

任務執行併發度

遷移速率(MB/s)

節點之間分割槽遷移

萬兆

SSD

6

6

200

4.3 節點縮容

我們將縮減上一小節中增加的伺服器 P3,縮減之前,需要將 P3-dn1 資料節點上的分割槽副本遷移至叢集中其它資料節點上。

自定義 moveChunks 函式,入參為一個源節點名稱,內部實現依賴 moveReplicas 函式,遷移源節點上所有分割槽副本至其它節點上。在資料節點上執行以下命令,執行遷移操作。

def moveChunks(srcNode) {
    chunks = exec chunkId from pnodeRun(getAllChunks) where site = srcNode
    allNodes = pnodeRun(getNodeAlias).node
    for(chunk in chunks) {
        destNode = allNodes[at(not allNodes in (exec site from pnodeRun(getAllChunks) where chunkId = chunk))].rand(1)[0]
        print("From " + srcNode + ", to " + destNode + ", moving " + chunk)
        rpc(getControllerAlias(), moveReplicas, srcNode, destNode, chunk)
    }
}

srcNode = "P3-dn1"
moveChunks(srcNode)

在資料節點上執行以下命令,檢視遷移進度。

rpc(getControllerAlias(), getRecoveryTaskStatus)

等待所有任務執行完畢後,在資料節點上執行以下命令,檢視遷移後各個資料節點分割槽數量統計。

select count(*) from pnodeRun(getAllChunks) group by site

返回結果:

site

count

P1-dn1

204

P2-dn1

204

此時,P3-dn1 資料節點上已不包含任何分割槽副本,可以進行節點縮減。

5 磁碟變更場景下的資料遷移

本章節針對磁碟擴容、縮容場景下的分割槽平衡或資料遷移以示例的形式展開介紹。

5.1 磁碟擴容

對於每個資料節點增加一塊磁碟。配置檔案中 volumes 引數由

volumes=/ssd/ssd0/jtwang/chunkData

更改為:

volumes=/ssd/ssd0/jtwang/chunkData,/ssd/ssd1/jtwang/chunkData

在資料節點上執行以下命令,對於 P1-dn1 資料節點上各塊磁碟之間進行分割槽再平衡。

rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P1-dn1", false })
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P1-dn1", true })

在資料節點上執行以下命令,檢視遷移進度。

rpc(getControllerAlias(), getRecoveryTaskStatus)

同樣地,在資料節點上執行以下命令,對於 P2-dn1 資料節點上各塊磁碟之間進行分割槽再平衡。

rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P2-dn1", false })
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P2-dn1", true })

在資料節點上執行以下命令,檢視遷移進度。

rpc(getControllerAlias(), getRecoveryTaskStatus)

從圖中看,兩次再平衡共計遷移189個分割槽。可以看到,DeleteSource 欄位全部為 False,原因為對於資料節點內部分割槽遷移,從舊盤到新盤,分割槽仍然是在同一個資料節點,所以該引數一直為 False。

等待所有任務執行完畢後,在資料節點上執行以下命令,檢視遷移後各個資料節點各塊磁碟的分割槽數量統計。

def getDiskNo(path) {
    size = path.size()
    result = array(STRING, 0, size)
    for(i in 0 : size) { append!(result, concat(split(path[i], "/")[0:5], "/")) }
    return result
}

select count(*) from pnodeRun(getAllChunks) group by site, getDiskNo(path) as disk

返回結果:

site

disk

count

P1-dn1

/ssd/ssd0/jtwang/chunkData

108

P1-dn1

/ssd/ssd1/jtwang/chunkData

96

P2-dn1

/ssd/ssd0/jtwang/chunkData

111

P2-dn1

/ssd/ssd1/jtwang/chunkData

93

5.2 磁碟擴容時資料遷移效能

P1-dn1 資料節點上各塊磁碟之間分割槽再平衡之後,在資料節點上執行以下命令,統計再平衡整體耗時。

select max(FinishTime - StartTime) as maxDiffTime from rpc(getControllerAlias(), getRecoveryTaskStatus)

返回結果:56秒。96個分割槽佔用磁碟空間約14GB,節點內部再平衡時資料遷移速度約為250MB/s。

場景

是否跨伺服器

網路

硬碟

任務發起併發度

任務執行併發度

遷移速率(MB/s)

磁碟之間分割槽遷移

萬兆

SSD

4

4

250

5.3 磁碟縮容

我們將縮減上一小節中增加的磁碟,縮減之前,需要將各個資料節點新增的磁碟上的分割槽副本遷移至其它磁碟。

在資料節點上執行以下命令,將 P1-dn1、P2-dn1 資料節點 ssd1 盤上的分割槽遷移至 ssd0 盤。

srcPath = "/ssd/ssd1/jtwang/chunkData/CHUNKS"
destPath = "/ssd/ssd0/jtwang/chunkData/CHUNKS"

node = "P1-dn1"
chunkIds = exec chunkId from pnodeRun(getAllChunks) where site = node, path like (srcPath + "%")
rpc(node, moveChunksAcrossVolume{ srcPath, destPath, chunkIds, isDelSrc=true })

node = "P2-dn1"
chunkIds = exec chunkId from pnodeRun(getAllChunks) where site = node, path like (srcPath + "%")
rpc(node, moveChunksAcrossVolume{ srcPath, destPath, chunkIds, isDelSrc=true })

在資料節點上執行以下命令,檢視遷移後各個資料節點各塊磁碟的分割槽數量統計。

select count(*) from pnodeRun(getAllChunks) group by site, getDiskNo(path) as disk

返回結果:

site

disk

count

P1-dn1

/ssd/ssd0/jtwang/chunkData

204

P2-dn1

/ssd/ssd0/jtwang/chunkData

204

此時,各個資料節點 ssd1 盤上已不包含任何分割槽副本,可以進行磁碟縮容。

6 小結

節點與磁碟的擴容和縮容,是生產環境中比較常見的場景。通過對此類場景的模擬和解決方案的實現,我們展示了 DolphinDB 強大、便捷的資料遷移與平衡能力,解決了叢集擴充套件之後如何優化資源使用的問題。

附件

data_move_rebalance