DolphinDB 數據遷移與再平衡

語言: CN / TW / HK

數據遷移再平衡的目標是保證分區副本儘可能均衡分佈,副本位置影響着 IO 性能、節點負載,對於數據訪問延遲有着較大的影響。近來,越來越多的客户對於數據容量或計算性能提出了更高要求,因而進行了集羣擴展,擴展之後如何在新的拓撲之中平衡數據,這是客户必然面臨的問題。為此,我們撰寫了本文,總結數據遷移再平衡的常見場景與方法。

本教程適用於1.30.17、2.00.5及以上版本。

1 概述

DolphinDB 集羣支持擴展節點與存儲,以便增強計算能力與數據容量,那麼新增的節點與磁盤是沒有存儲任何數據的,所有數據依舊存儲在舊的節點與磁盤。也就是説,不同節點或不同磁盤上的數據分佈可能會存在較為嚴重的傾斜,這樣可能會帶來幾個問題:

  • 分佈式查詢計算子任務無法分配到新增的節點,導致該節點計算資源無法被充分利用;

  • 舊的磁盤 IO 壓力過大,新增的磁盤 IO 無法被充分利用。

再者,數據依舊存儲在舊的磁盤,如果舊的數據庫分區存在數據寫入或更新,那麼舊的磁盤空間佔滿的機率相對較大,一旦滿了會引發多種問題。

所以,節點與存儲變更之後的數據遷移與平衡便顯得非常必要。後面章節中,將會重點介紹節點與磁盤變更之後的數據遷移與平衡。

2 環境配置與數據模擬

2.1 硬件配置

測試總共使用三台配置相同的服務器,分別命名為 P1、P2、P3,具體硬件配置如表所示。

處理器

核數

內存

操作系統

硬盤

Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz

64

512 GB

CentOS Linux release 7.9

SSD

2.2 集羣配置

DolphinDB Server版本為2.00.9。

基於兩台服務器搭建雙副本普通集羣,假設兩台服務器名稱分別為 P1、P2,P1 部署一個控制節點、一個代理節點、一個數據節點,P2部署一個代理節點、一個數據節點,如表所示。

服務器

IP

端口

節點別名

節點類型

P1

192.168.100.4X

8110

ctl1

控制節點

P1

192.168.100.4X

8111

P1-agent

代理節點

P1

192.168.100.4X

8112

P1-dn1

數據節點

P2

192.168.100.4X

8111

P2-agent

代理節點

P2

192.168.100.4X

8112

P2-dn1

數據節點

節點別名中,ctl 為英文單詞 controller 的縮寫,dn 為英文單詞 datanode 的縮寫。詳細配置情況請參照附件。

2.3 模擬數據

模擬數據為上交所 Level-1 快照數據,基於真實數據結構模擬2000只股票快照數據,基於 OLAP 與 TSDB 存儲引擎的建庫建表、數據模擬、數據插入腳本如下:

model = table(1:0, `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`Volume`Amount`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferQty1`OfferQty2`OfferQty3`OfferQty4`OfferQty5, [SYMBOL, DATETIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG, LONG, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG, LONG, LONG])

// OLAP存儲引擎建庫建表
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSecurityID = database("", HASH, [SYMBOL, 10])
db = database("dfs://Level1", COMPO, [dbDate, dbSecurityID])
createPartitionedTable(db, model, `Snapshot, `DateTime`SecurityID)

// TSDB存儲引擎建庫建表
dbDate = database("", VALUE, 2020.06.01..2020.06.07)
dbSymbol = database("", HASH, [SYMBOL, 10])
db = database("dfs://Level1_TSDB", COMPO, [dbDate, dbSymbol], engine="TSDB")
createPartitionedTable(db, model, `Snapshot, `DateTime`SecurityID, sortColumns=`SecurityID`DateTime)

def mockHalfDayData(Date, StartTime) {
    t_SecurityID = table(format(600001..602000, "000000") + ".SH" as SecurityID)
    t_DateTime = table(concatDateTime(Date, StartTime + 1..2400 * 3) as DateTime)
    t = cj(t_SecurityID, t_DateTime)
    size = t.size()
    return  table(t.SecurityID as SecurityID, t.DateTime as DateTime, rand(100.0, size) as PreClosePx, rand(100.0, size) as OpenPx, rand(100.0, size) as HighPx, rand(100.0, size) as LowPx, rand(100.0, size) as LastPx, rand(10000, size) as Volume, rand(100000.0, size) as Amount, rand(100.0, size) as BidPrice1, rand(100.0, size) as BidPrice2, rand(100.0, size) as BidPrice3, rand(100.0, size) as BidPrice4, rand(100.0, size) as BidPrice5, rand(100000, size) as BidOrderQty1, rand(100000, size) as BidOrderQty2, rand(100000, size) as BidOrderQty3, rand(100000, size) as BidOrderQty4, rand(100000, size) as BidOrderQty5, rand(100.0, size) as OfferPrice1, rand(100.0, size) as OfferPrice2, rand(100.0, size) as OfferPrice3, rand(100.0, size) as OfferPrice4, rand(100.0, size) as OfferPrice5, rand(100000, size) as OfferQty1, rand(100000, size) as OfferQty2, rand(100000, size) as OfferQty3, rand(100000, size) as OfferQty4, rand(100000, size) as OfferQty5)
}

def mockData(DateVector, StartTimeVector) {
    for(Date in DateVector) {
        for(StartTime in StartTimeVector) {
            data = mockHalfDayData(Date, StartTime)
 
            // OLAP存儲引擎分佈式表插入模擬數據
            loadTable("dfs://Level1", "Snapshot").append!(data)
  
            // TSDB存儲引擎分佈式表插入模擬數據
            loadTable("dfs://Level1_TSDB", "Snapshot").append!(data)
        }   
    }
}

mockData(2020.06.01..2020.06.10, 09:30:00 13:00:00)

上述腳本中,我們模擬了2020.06.01至2020.06.10共計十天的數據。基於 OLAP 與 TSDB 兩種存儲引擎的分區方案是一致的,均採用兩級分區,分區粒度為表級分區,一級按天作了 VALUE 分區,二級按 SYMBOL 類型作了10個 HASH 分區。各個數據節點分區數量統計如下:

select count(*) from pnodeRun(getAllChunks) where dfsPath like "/Level1%" group by site, type

返回結果:

site

type

count

P1-dn1

0

4

P1-dn1

1

200

P2-dn1

0

4

P2-dn1

1

200

其中,type 字段表示分區類別,0表示 File Chunk,1表示 Tablet Chunk。File Chunk 包含 domain、

.tbl 等文件,一個庫對應一個 domain 文件,一個表對應一個 tbl 文件,存儲庫表結構相關的信息;真正存儲數據的區塊我們稱之為 Tablet Chunk。因為配置了雙副本,且在資源允許情況下,副本優先部署在多台物理服務器,所以上述結果符合預期,兩個庫共計包含408個分區。

2.4 注意事項

由於數據遷移再平衡任務比較耗費物理資源,且對於正在寫入、修改、刪除操作的分區,由於分區鎖的佔用,會出現遷移失敗的情況;對於耗時較長的查詢計算任務,由於緩存指向舊的分區路徑,可能會出現中途拋出異常。這些屬於預期內的正常現象。

建議

無寫入、無查詢任務執行時進行數據遷移再平衡操作。

3 函數介紹

本教程使用到的再平衡與數據遷移相關函數介紹如下,以下所有函數均須在控制節點執行,僅管理員用户擁有權限。

3.1 rebalanceChunksAmongDataNodes

函數定義

rebalanceChunksAmongDataNodes([exec=false])

功能描述

集羣之中所有數據節點之間的分區再平衡。

參數含義

exec:默認值為 false,僅根據再平衡算法預估分區遷移結果,並不真正執行;為 true,才會執行遷移。

平衡算法

平衡算法存在一些假定條件,諸如:

  • 舊盤所有已使用空間均是被分區數據佔用;

  • 每個分區大小一致,而實際可能存在偏差;

  • 新盤所有空間均是用來存儲分區數據用的;

實際情況與假定條件存在一定的誤差,如:磁盤除了存放 DolphinDB 數據外存了不少其它內容,分區大小傾斜嚴重等,可能會導致再平衡結果不夠理想。初次執行再平衡後,舊盤、新盤磁盤空間、分區大小等均發生了變化,多次執行再平衡,可以在一定程度上優化再平衡效果。

3.2 rebalanceChunksWithinDataNode

函數定義

rebalanceChunksWithinDataNode(nodeAlias, [exec=false])

功能描述

集羣之中某個數據節點內部、不同磁盤之間的數據再平衡。

參數含義

nodeAlias:字符串,表示節點別名。

exec:默認值為 false,僅根據再平衡算法預估分區遷移結果,並不真正執行;為 true,才會執行遷移。

平衡算法

同上一小節。

3.3 restoreDislocatedTablet

函數定義

restoreDislocatedTablet()

功能描述

當配置分區粒度為表級分區時,同一個分區的所有表將分佈在相同的節點下。當調用函數 rebalanceChunksAmongDataNodes 進行數據平衡時,若出現節點宕機或離線,可能出現同一個分區裏部分表的數據轉移成功,部分表的數據轉移失敗的情況,即同一個分區下的不同表會分佈在不同的節點。該函數可以修復此問題,將同一個分區裏的錶轉移到同一個節點下。

3.4 moveReplicas

函數定義

moveReplicas(srcNode, destNode, chunkId)

功能描述

將源節點上一個或多個分區副本遷移至目標節點。

參數含義

srcNode:字符串,表示源節點別名。

destNode:字符串,表示目標節點別名。

chunkId:字符串/UUID 標量或向量,表示分區 ID。

3.5 moveChunksAcrossVolume

函數定義

moveChunksAcrossVolume(srcPath, destPath, chunkIds, [isDelSrc=true])

功能描述

將磁盤卷源路徑下一個或多個分區文件轉移至目標路徑。

參數含義

srcPath:源分區文件路徑。

destPath:目標分區文件路徑。

chunkIds:分區 ID。

isDelSrc:拷貝成功後是否刪除源分區。

4 節點變更場景下的數據遷移

本章節針對節點擴容、縮容場景下的分區平衡或數據遷移以示例的形式展開介紹。

4.1 節點擴容

增加一台服務器,名稱為 P3,部署一個代理節點、一個數據節點,如表所示。

服務器

IP

端口

節點別名

節點類型

P3

192.168.100.4X

8111

P3-agent

代理節點

P3

192.168.100.4X

8112

P3-dn1

數據節點

首先,在數據節點上執行以下命令:

rpc(getControllerAlias(), rebalanceChunksAmongDataNodes{ false })

返回預估分區遷移結果,如圖所示,預估 P1-dn1、P2-dn1 共有114個分區遷移至 P3-dn1。

然後,在數據節點上執行以下命令,真正地執行分區再平衡。

rpc(getControllerAlias(), rebalanceChunksAmongDataNodes{ true })

在數據節點上執行以下命令,查看任務併發度與再平衡任務執行進度。

rpc(getControllerAlias(), getConfigure{ `dfsRebalanceConcurrency })
rpc(getControllerAlias(), getRecoveryTaskStatus)

任務執行進度如圖所示。可以看到,DeleteSource 字段全部為 True,原因為對於數據節點之間分區再平衡,從源數據節點到目的數據節點複製完成後,該參數會被置為 True,源數據節點會刪除相應副本信息。

Status字段為 In-Progress 的任務數目表示控制節點發起任務時的併發度,通過 dfsRebalanceConcurrency 參數配置,默認為數據節點個數的兩倍。recoveryWorkers 參數表示數據節點執行任務時的併發度,默認為數據節點個數的兩倍。

Status 字段為 Finished,表示該任務已經完成。等待所有任務執行完畢後,在數據節點上執行以下命令,查看再平衡後各個數據節點分區數量統計。

select count(*) from pnodeRun(getAllChunks) group by site

返回結果:

site

count

P1-dn1

196

P2-dn1

98

P3-dn1

114

4.2 節點擴容時數據遷移性能

在數據節點上執行以下命令,統計再平衡整體耗時。

select max(FinishTime - StartTime) as maxDiffTime from rpc(getControllerAlias(), getRecoveryTaskStatus)

返回結果:89秒。114個分區佔用磁盤空間約17GB,節點之間再平衡時數據遷移速度約為200MB/s。

場景

是否跨服務器

網絡

硬盤

任務發起併發度

任務執行併發度

遷移速率(MB/s)

節點之間分區遷移

萬兆

SSD

6

6

200

4.3 節點縮容

我們將縮減上一小節中增加的服務器 P3,縮減之前,需要將 P3-dn1 數據節點上的分區副本遷移至集羣中其它數據節點上。

自定義 moveChunks 函數,入參為一個源節點名稱,內部實現依賴 moveReplicas 函數,遷移源節點上所有分區副本至其它節點上。在數據節點上執行以下命令,執行遷移操作。

def moveChunks(srcNode) {
    chunks = exec chunkId from pnodeRun(getAllChunks) where site = srcNode
    allNodes = pnodeRun(getNodeAlias).node
    for(chunk in chunks) {
        destNode = allNodes[at(not allNodes in (exec site from pnodeRun(getAllChunks) where chunkId = chunk))].rand(1)[0]
        print("From " + srcNode + ", to " + destNode + ", moving " + chunk)
        rpc(getControllerAlias(), moveReplicas, srcNode, destNode, chunk)
    }
}

srcNode = "P3-dn1"
moveChunks(srcNode)

在數據節點上執行以下命令,查看遷移進度。

rpc(getControllerAlias(), getRecoveryTaskStatus)

等待所有任務執行完畢後,在數據節點上執行以下命令,查看遷移後各個數據節點分區數量統計。

select count(*) from pnodeRun(getAllChunks) group by site

返回結果:

site

count

P1-dn1

204

P2-dn1

204

此時,P3-dn1 數據節點上已不包含任何分區副本,可以進行節點縮減。

5 磁盤變更場景下的數據遷移

本章節針對磁盤擴容、縮容場景下的分區平衡或數據遷移以示例的形式展開介紹。

5.1 磁盤擴容

對於每個數據節點增加一塊磁盤。配置文件中 volumes 參數由

volumes=/ssd/ssd0/jtwang/chunkData

更改為:

volumes=/ssd/ssd0/jtwang/chunkData,/ssd/ssd1/jtwang/chunkData

在數據節點上執行以下命令,對於 P1-dn1 數據節點上各塊磁盤之間進行分區再平衡。

rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P1-dn1", false })
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P1-dn1", true })

在數據節點上執行以下命令,查看遷移進度。

rpc(getControllerAlias(), getRecoveryTaskStatus)

同樣地,在數據節點上執行以下命令,對於 P2-dn1 數據節點上各塊磁盤之間進行分區再平衡。

rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P2-dn1", false })
rpc(getControllerAlias(), rebalanceChunksWithinDataNode{ "P2-dn1", true })

在數據節點上執行以下命令,查看遷移進度。

rpc(getControllerAlias(), getRecoveryTaskStatus)

從圖中看,兩次再平衡共計遷移189個分區。可以看到,DeleteSource 字段全部為 False,原因為對於數據節點內部分區遷移,從舊盤到新盤,分區仍然是在同一個數據節點,所以該參數一直為 False。

等待所有任務執行完畢後,在數據節點上執行以下命令,查看遷移後各個數據節點各塊磁盤的分區數量統計。

def getDiskNo(path) {
    size = path.size()
    result = array(STRING, 0, size)
    for(i in 0 : size) { append!(result, concat(split(path[i], "/")[0:5], "/")) }
    return result
}

select count(*) from pnodeRun(getAllChunks) group by site, getDiskNo(path) as disk

返回結果:

site

disk

count

P1-dn1

/ssd/ssd0/jtwang/chunkData

108

P1-dn1

/ssd/ssd1/jtwang/chunkData

96

P2-dn1

/ssd/ssd0/jtwang/chunkData

111

P2-dn1

/ssd/ssd1/jtwang/chunkData

93

5.2 磁盤擴容時數據遷移性能

P1-dn1 數據節點上各塊磁盤之間分區再平衡之後,在數據節點上執行以下命令,統計再平衡整體耗時。

select max(FinishTime - StartTime) as maxDiffTime from rpc(getControllerAlias(), getRecoveryTaskStatus)

返回結果:56秒。96個分區佔用磁盤空間約14GB,節點內部再平衡時數據遷移速度約為250MB/s。

場景

是否跨服務器

網絡

硬盤

任務發起併發度

任務執行併發度

遷移速率(MB/s)

磁盤之間分區遷移

萬兆

SSD

4

4

250

5.3 磁盤縮容

我們將縮減上一小節中增加的磁盤,縮減之前,需要將各個數據節點新增的磁盤上的分區副本遷移至其它磁盤。

在數據節點上執行以下命令,將 P1-dn1、P2-dn1 數據節點 ssd1 盤上的分區遷移至 ssd0 盤。

srcPath = "/ssd/ssd1/jtwang/chunkData/CHUNKS"
destPath = "/ssd/ssd0/jtwang/chunkData/CHUNKS"

node = "P1-dn1"
chunkIds = exec chunkId from pnodeRun(getAllChunks) where site = node, path like (srcPath + "%")
rpc(node, moveChunksAcrossVolume{ srcPath, destPath, chunkIds, isDelSrc=true })

node = "P2-dn1"
chunkIds = exec chunkId from pnodeRun(getAllChunks) where site = node, path like (srcPath + "%")
rpc(node, moveChunksAcrossVolume{ srcPath, destPath, chunkIds, isDelSrc=true })

在數據節點上執行以下命令,查看遷移後各個數據節點各塊磁盤的分區數量統計。

select count(*) from pnodeRun(getAllChunks) group by site, getDiskNo(path) as disk

返回結果:

site

disk

count

P1-dn1

/ssd/ssd0/jtwang/chunkData

204

P2-dn1

/ssd/ssd0/jtwang/chunkData

204

此時,各個數據節點 ssd1 盤上已不包含任何分區副本,可以進行磁盤縮容。

6 小結

節點與磁盤的擴容和縮容,是生產環境中比較常見的場景。通過對此類場景的模擬和解決方案的實現,我們展示了 DolphinDB 強大、便捷的數據遷移與平衡能力,解決了集羣擴展之後如何優化資源使用的問題。

附件

data_move_rebalance