從一次 SQL 查詢的全過程看 DolphinDB 的執行緒模型

語言: CN / TW / HK

分散式系統較為複雜,無論寫入還是查詢,都需要多個節點的配合才能完成操作。本教程以一個分散式 SQL 查詢為例,介紹 DolphinDB 分散式資料庫的資料流以及其中經歷的各類執行緒池。通過了解 SQL 查詢的全過程,也可以幫助我們更好地優化 DolpinDB 的配置和效能。

1. DolphinDB 執行緒型別

woker
常規互動作業的工作執行緒,用於接收客戶端請求,將任務分解為多個小任務,根據任務的粒度自己執行或者傳送給 local executor 或 remote executor 執行。

local executor
本地執行執行緒,用於執行 worker 分配的子任務。每個本地執行執行緒一次只能處理一個任務。所有工作執行緒共享本地執行執行緒。plooppeach 等平行計算函式的計算任務分配在本地執行執行緒完成。

remote executor
遠端執行執行緒,將遠端子任務傳送到遠端節點的獨立執行緒。

batch job worker
使用 submitJob  submitJobEx 建立批處理作業的工作執行緒。該執行緒在任務執行完後若閒置 60 秒則會被系統自動回收,不再佔用系統資源。

web worker
處理 HTTP 請求的工作執行緒。DolphinDB 提供了基於 web 的叢集管理介面,使用者可以通過 web 與 DolphinDB 節點進行互動,提交的請求由該執行緒處理。

secondary worker
次級工作執行緒。當前節點產生的遠端子任務,會在遠端節點的次級工作執行緒上執行,用於避免作業環,解決節點間的任務迴圈依賴而導致的死鎖問題。

dynamic worker
動態工作執行緒。當所有的工作執行緒被佔滿且有新任務時,系統會自動建立動態工作執行緒來執行任務。根據系統併發任務的繁忙程度,總共可以建立三個級別的動態工作執行緒,每一個級別可以建立 maxDymicWorker 個動態工作執行緒。該執行緒在任務執行完後若閒置 60 秒則會被系統自動回收,不再佔用系統資源。

infra worker
基礎設施處理執行緒。當開啟元資料高可用或流資料高可用的時候,系統會自動建立基礎設施處理執行緒,用於處理叢集節點間的 raft 資訊同步工作。

urgent worker
緊急工作執行緒,只接收一些特殊的系統級任務,如登入 login ,取消作業 cancelJobcancelConsoleJob 等。

diskIO worker
磁碟資料讀寫執行緒,通過引數 diskIOConcurrencyLevel 控制。如果 diskIOConcurrencyLevel = 0,表示直接用當前執行緒來讀寫磁碟資料。如果 diskIOConcurrencyLevel > 0,則會建立相應個數的指定執行緒來讀寫磁碟資料。

2. 不同型別執行緒與配置引數的關係

執行緒型別 配置引數 預設配置
woker wokerNum 預設值是 CPU 的核心數
local executor localExecutors 預設值是 CPU 核心數減1
remote executor remoteExecutors 預設值是1
batch job worker maxBatchJobWorker 預設值是 workerNum 的值
web worker webWorkerNum 預設值是1
secondary worker secondaryWorkerNum 預設值是 workerNum 的值
dynamic worker maxDynamicWorker 預設值是 workerNum 的值
infra worker infraWorkerNum 預設值是2
urgent worker urgentWorkerNum 預設值是1
diskIO worker diskIOConcurrencyLevel 預設值是1

3. API 發起一次 SQL 查詢的執行緒經歷

DolphinDB 的主要節點型別:

  • controller
    控制節點,負責收集代理節點和資料節點的心跳,監控每個節點的工作狀態,管理分散式檔案系統的元資料和事務。
  • data node
    資料節點,既可以儲存資料,也可以完成查詢和複雜的計算。
  • compute node
    計算節點,只用於計算,應用於包括流計算、分散式關聯、機器學習、資料分析等場景。計算節點不儲存資料,故在該節點上不能建庫建表,但可以通過 loadTable 載入資料進行計算。可以通過在叢集中配置計算節點,將寫入任務提交到資料節點,將所有計算任務提交到計算節點,實現儲存和計算的分離。2.00.1版本開始支援計算節點。

綜上,API 發起的 SQL 查詢可以提交到一個協調節點(coordinator) 來完成資料的查詢和計算請求。 coordinator 可以是叢集中的 data node 或者 compute node。當使用 2.00.1 及以上版本時,使用者可以通過在叢集中配置 compute node,將 SQL 查詢任務全部提交到 compute node,實現儲存和計算的分離。下面以 API 向 coordinator 發起一次 SQL 查詢為例,講述整個過程中所排程的所有執行緒。

 

 

step1:DolphinDB 客戶端向 coordinator 發起資料查詢請求

以 coordinator 為 data node 為例,例如發起一次聚合查詢,查詢語句如下:

select avg(price) from loadTable("dfs://database", "table") where year=2021 group by date

假設上述聚合查詢語句總共涉及 300 個分割槽的資料,且正好平均分配在三個資料節點。

data node1:100 chunks;data node2:100 chunks;data node3:100 chunks

DolphinDB 客戶端將查詢請求進行二進位制序列化後通過 tcp 協議傳輸給 data node1。

step2:data node1 收到查詢請求

data node1 收到客戶端的查詢請求後,將分配 1 個 worker 執行緒對內容進行反序列化和解析。當發現內容是 SQL 查詢時,會向 controller 發起請求,獲取跟這個查詢相關的所有分割槽的資訊。整個 SQL 查詢未執行完畢時,當前的 worker 執行緒會被一直佔用。

step3:controller 收到 data node1 的請求

controller 收到 data node1 的請求後,將分配 1 個 worker 執行緒對內容進行反序列化和解析,準備好本次 SQL 查詢涉及的資料分割槽資訊後,由該 worker 執行緒序列化後通過 tcp 協議傳輸給 data node1。controller 的 worker 完成該工作後將從佇列中獲取下一個請求。

data node1:100 chunks;data node2:100 chunks;data node3:100 chunks

step4:data node1 收到 controller 返回的資訊

data node1 收到 controller 返回的資訊後,由一直佔用的 worker 執行緒對內容進行反序列化和解析,得知本次 SQL 查詢涉及的資料分割槽資訊後,將位於本節點的分割槽資料計算任務傳送到本地任務佇列,此時本地任務佇列會產生 100 個子任務。同時,將在遠端節點 data node2、data node3 的分割槽資料計算任務以 group task 的方式傳送到遠端任務佇列,所以遠端任務佇列會被新增2個遠端任務,分別打上 data node2 和 data node3 的標誌。

step5(1):本地 worker 和 local executor 消費本地任務佇列

此時,一直佔用的 worker 執行緒和 local executor 執行緒會同時並行消費本地任務佇列的子任務。所以配置項中的 wokerNum 和 localExecutors 很大程度上決定了系統的併發計算能力。

step5(2)(3):本地 remote executor 傳送遠端任務至遠端節點

同時,remote executor 執行緒將遠端任務佇列的內容序列化後通過 tcp 協議分別傳送到 data node2 和 data node3。

step6(1)(2):遠端節點收到遠端任務

data node2 和 data node3 收到遠端任務後,將分配 1 個 secondary worker 執行緒對內容進行反序列化和解析,並將計算任務傳送到本地任務佇列,此時 data node2 和 data node3 的本地任務佇列都會產生 100 個子任務。

step7(1)(2):遠端節點 secondary worker 和 local executor 消費本地任務佇列

此時,data node2 和 data node3 上一直佔用的 secondary worker 執行緒和 local executor 執行緒會同時並行消費本地任務佇列的子任務。所以配置項中的 secondaryWorkerNum 對系統的併發計算能力也有一定影響。

step8(1)(2):遠端節點返回中間計算結果至 data node1

當 data node2 和 data node3 涉及的計算任務完成後,分別得到了本次 SQL 查詢的中間計算結果,由一直佔用的 secondary worker 執行緒對內容進行序列化後通過 tcp 協議傳輸給 data node1。

step9:data node1 計算最終結果並返回給客戶端

data node1 接收到 data node2 和 data node3 返回的中間計算結果後,由一直佔用的 worker 執行緒對內容進行反序列化,然後在該執行緒上計算出最終結果,並在序列化後通過 tcp 協議傳輸給客戶端。

DolphinDB 客戶端接收到 data node1 返回的資訊後,經過反序列化顯示本次 SQL 查詢的結果。

coordinator 為 data node 和 compute node 的區別

  • compute node 不儲存資料,所以 compute node 解析客戶端的 SQL 查詢後,從 controller 拿到本次 SQL 查詢涉及的資料分割槽資訊,會將所有資料查詢任務都分配到 data node 執行,得到每個 data node 返回的中間結果,最後排程 compute node 的本地資源計算最終結果並返回給客戶端。
  • 將所有 SQL 查詢都提交到 compute node 後,可以實現儲存和計算的分離,減輕 data node 的計算工作負擔。當實時寫入的資料量非常大時,建議配置 compute node,將所有 SQL 查詢都提交到 compute node,實現儲存和計算的分離。2.00.1版本開始支援計算節點。

4. SQL 查詢過程分析

通過對 API 發起一次 SQL 查詢的執行緒經歷統計分析可以發現,本次 SQL 查詢一共發生了 8 次 tcp 傳輸,其中 2 次是 DolphinDB server 和 DolphinDB client 之間的傳輸。如果查詢結果的資料量比較大,但對查詢結果的延時性又比較敏感,可以優化的方向主要有以下幾個:

  • 叢集節點間為內網通訊,推薦萬兆乙太網。
  • DolphinDB server 和 DolphinDB client 間為內網通訊。
  • API 指定查詢結果返回進行資料壓縮。
  • 執行緒配置引數優化。
  • SQL 語句優化:where 條件新增分割槽欄位的資訊過濾,起到分割槽剪枝的目的,避免全表掃描,這樣可以大大減少子任務的數目。
  • 增加每個節點的磁碟卷的數量。這樣更多的磁碟可以並行讀取分割槽的資料。
  • 增加license 限制的 CPU 核心數和記憶體大小,提升系統的並行或併發處理能力。

5. 執行緒配置引數優化

wokerNum
如果 license 限制的 CPU 核心數大於物理機 CPU 核心數,推薦 wokerNum 等於物理機 CPU 核心數;如果 license 限制的 CPU 核心數小於等於物理機 CPU 核心數,推薦 wokerNum 等於 license 限制的 CPU 核心數。

localExecutors
推薦 localExecutors = wokerNum - 1

remoteExecutors
推薦 remoteExecutors = n - 1,n 表示叢集的節點數。如果是單節點 single 模式或者是單資料節點叢集,不需要配置 remoteExecutors 的值。

maxBatchJobWorker
推薦 maxBatchJobWorker = wokerNum,採用預設值。

webWorkerNum
推薦控制節點 webWorkerNum 可以配置為 4,資料節點 webWorkerNum 可以配置為 1。因為正常情況下很少通過 web 與 DolphinDB 節點互動的方式提交查詢任務。

secondaryWorkerNum
推薦 secondaryWorkerNum = wokerNum,採用預設值。

maxDynamicWorker
推薦 maxDynamicWorker = wokerNum,採用預設值。

infraWorkerNum
推薦 infraWorkerNum = 2,採用預設值。

urgentWorkerNum
推薦 urgentWorkerNum = 1,採用預設值。

diskIOConcurrencyLevel
對於 hdd 磁碟,推薦 diskIOConcurrencyLevel 等於對應節點下通過 volumes 引數配置的磁碟個數。對於 ssd 磁碟,推薦 diskIOConcurrencyLevel = 0。