時序資料庫 DolphinDB 執行緒簡介

語言: CN / TW / HK

本文基於 DolphinDB server 最新版 2.00.X,從任務管理、儲存引擎、流資料、叢集管理、高可用幾個方面,為大家簡單介紹 DolphinDB 在執行中可能使用到的各種執行緒,及其相關的配置項和函式,以便使用者掌握 DolphinDB 的執行緒執行情況。

1.任務管理

工作執行緒(Worker)接收客戶端請求,將任務分解為多個小任務,根據任務的粒度自己執行或者傳送給執行執行緒 localExecutor 或 remoteExecutor 執行。

Worker

常規互動作業的工作執行緒。每個節點都存在 Worker 執行緒,可以分為以下幾類。

  • ZeroWorker, FirstWorker, SecondWorker, ThirdWorker, ForthWorker

客戶端提交至節點的作業為0級,由 ZeroWorker 處理。根據作業所涉及到的分割槽,ZeroWorker 將其分解為多個子任務。其中本地節點上的子任務由 ZeroWorker 與 localExecutor 並行執行;需要由遠端節點執行的子任務則降低為1級,並通過 remoteExecutor 傳送到對應節點上的 FirstWorker 處理。

以此類推,若某個級別的子任務需要進一步拆解,則拆分出來的由遠端節點執行的子任務降低一級,傳送至遠端節點上對應層級的 Worker 處理。

ZeroWorker 和 FirstWorker 的數量由 workerNum 決定,預設值為機器上的 CPU 核數,最大值不超過 license 中的最大 核數。其餘層級的 Work 數量為上級的0.75倍,最小個數為1。

  • UrgentWorker

處理緊急任務,只接收一些特殊的系統級任務,譬如登入,取消作業等。由 urgentWorkerNum 配置,預設值為1,最大值為 CPU 核心數。

  • WebWorker

處理 HTTP 請求,由 webWorkerNum 配置。預設為1,最大值為 CPU 核心數。

  • InfraWorker

開啟高可用後,用於接收 raft 心跳彙報的執行緒,防止叢集負載大時,心跳資訊無法及時彙報。預設有2個該執行緒。

LocalExecutor

本地執行執行緒。Worker 拆解完任務後生成的本地子任務佇列,由同一節點下的 localExecutor 執行。所有 Worker共享本地執行執行緒,每個 localExecutor 一次只能處理一個子任務。通過 localExecutors 配置執行緒個數,預設為機器上的 CPU 核數減1。

RemoteExecutor

將遠端任務傳送到遠端節點的執行緒,在非 single 模式的節點上可以通過 remoteExecutors 配置執行緒個數。預設值為叢集中節點個數和本地 Worker 的較小值。

AsynchronousRemoteExecutor

接收對其他節點發起的遠端呼叫(Remote Procedure Call, RPC)任務的執行緒,並將收到的遠端呼叫放到 RemoteExecutor 的任務佇列中。每個非 single 模式的節點上有且僅有一個該執行緒。

RemoteTaskDispatcher

在遠端調用出錯需要重試時,或者一個被關閉的連線上仍有未完成的任務時,這些任務會先放到一個佇列裡,由 RemoteTaskDispatcher 從這個佇列取任務並重新交由 AsynchronousRemoteExecutor 去發起遠端呼叫。

DynamicWorkerManager 和 DynamicWorker

DynamicWorker 是動態工作執行緒,作為 Worker 的補充。DynamicWorkerManager 是建立 DynamicWorker 的執行緒,每個節點有且僅有一個該執行緒。如果所有的工作執行緒被佔滿,有新任務到來時,通過該執行緒建立 DynamicWorker 來執行新任務。根據系統併發任務的繁忙程度,總共可以建立三組動態工作執行緒,每一個級別可以建立 maxDynamicWorker 個動態工作執行緒。

動態工作執行緒在任務執行完後若閒置60秒則會被系統自動回收,不再佔用系統資源。maxDynamicWorker 的預設值為 workerNum。

DynamicExecutorManager 和 DynamicExecutor

與 DynamicWorkerManager 和 DynamicWorker 類似,DynamicExecutor 是由 DynamicWorkerManager 動態建立的執行執行緒。DynamicExecutor 執行緒數量上限由 maxDynamicLocalExecutor 決定,預設值為 localExecutors。DynamicWorkerManager 最多可以建立3組動態執行執行緒,每組執行緒最多為 maxDynamicLocalExecutor 個。

DynamicExecutor 在閒置60秒後被自動回收。

BlockIOWorker

執行對硬碟讀寫任務的執行緒。通過 diskIOConcurrencyLevel 控制執行緒數量,預設值為1。

BatchJobWorker

執行批處理作業任務的工作執行緒。其上限通過 maxBatchJobWorker 設定,預設值是 workerNum。該執行緒在任務執行完後若閒置60秒會被系統自動回收,不再佔用系統資源。

2.儲存引擎

儲存相關的執行緒在 server 啟動過程中建立。其主要負責資料的寫入與落盤,並在各種情況下(如節點宕機,磁碟損壞導致資料損壞)維護各節點間資料的完整與一致性。

2.1 預寫日誌

在資料節點上,OLAP 和 TSDB 都有兩個寫預寫日誌(WAL)的執行緒 RedoLogHeadWriter 和 RedoLogDataWriter,分別負責寫 WAL 的元資料和資料。

儲存 OLAP 和 TSDB WAL 日誌的目錄分別由 redoLogDirTSDBRedoLogDir 配置。

  • RedoLogHeadWriter
    事務發生時通過 RedoLogHeadWriter 同步寫事務元資料資訊到 redoLog 目錄下的 header.log 中。
  • RedoLogDataWriter
    事務的資料會通過 RedoLogDataWriter 非同步寫入 redoLog 目錄下的 tid.log 中。

2.2 OLAP 引擎

在開啟了 OLAP 的 cacheEngine 後,會建立一個 ChunkCacheEngineGCWorker 執行緒。

  • ChunkCacheEngineGCWorker
    負責將 cacheEngine 中的資料寫入磁碟的執行緒。用於清理 cacheEngine,並將磁碟的隨機寫變成順序寫的執行緒。BackgroundRoutineService 每隔60秒,或者寫入時 cacheEngine 的佔用量超過 OLAPCacheEngineSize 的30%,就會觸發 ChunkCacheEngineGCWorker 將 cacheEngine 中的資料寫入磁碟。cacheEngine 中一張表的一個分割槽資料稱為一個 tabletCache,ChunkCacheEngineGCWorker 寫入磁碟時會根據每個 tabletCache 的大小和在 cacheEngine 中存在時間決定寫入磁碟優先順序,tabletCache 的大小越大、存在時間越久寫入磁碟優先順序越高。

2.3 TSDB 引擎(1.30.X 版本 server 無此類執行緒)

除了將資料寫入磁碟外,TSDB 引擎的執行緒還要負責 cacheEngine 中資料的排序與合併,並維護磁碟上的 levelFile,以提高讀寫效能。這些執行緒也僅在資料節點上存在。

  • TableAsyncSortRunner
    非同步地對 TSDB cacheEngine 中的表進行排序的執行緒。TSDB 在寫入 cacheEngine 時,如果 cacheEngine 中的表太大,會影響查詢效能,因此需要進行排序。但若同步進行排序,會影響寫入效能。所以 DolphinDB 提供此執行緒非同步地對錶進行排序。可以通過 TSDBAsyncSortingWorkerNum 來控制排序執行緒的數量,預設值為1。也可以通過函式 disableTSDBAsyncSorting 和 enableTSDBAsyncSorting,來手動開啟和關閉非同步排序功能。
  • CacheDumpTaskDispatcher
    分配 cacheEngine 寫入磁碟任務的執行緒。執行緒數量固定為1。 當 cacheEngine 的記憶體佔用大於 TSDBCacheEngineSize 時,系統會對 cacheEngine 做一次快照,並將快照送到 CacheDumpTaskDispatcher 執行緒準備寫入磁碟。CacheDumpTaskDispatcher 執行緒將任務分配給 ChunkCacheDumpRunner 執行緒,由該執行緒寫入磁碟。若磁碟上存在需要合併的 levelFile,則交由 MergeRunner 執行緒進行合併。
  • ChunkCacheDumpRunner
    將 cacheEngine 中的資料寫入磁碟的執行緒。執行緒的個數等於 volumes 的配置值。
  • MergeRunner
    對磁碟上 levelFile 進行合併的執行緒,執行緒的個數等於 volumes 的配置值。
  • DelLevelFileRunner
    檢查並刪除無效的 levelFile(即已經被合併的較小 size 的檔案)的執行緒。執行緒數量固定為1。每隔30秒會自動執行一次。

2.4 資料恢復

資料恢復(recovery)相關執行緒負責節點宕機,或者資料損壞時,資料副本間的資料恢復。

  • RecoveryReportingService
    在資料節點上,任何一個 chunk 發生資料錯誤或者版本號不一致都通過該執行緒來向控制節點彙報。每個資料節點有且僅有一個該執行緒。
  • RecoveryWorker
    發生 recovery 時,資料恢復的源節點將資料傳送給目標節點的執行緒。該執行緒僅存在於資料節點,個數可由 recoveryWorkers 配置,預設值為1。可以通過 resetRecoveryWorkerNum 函式動態修改執行緒個數,通過 getRecoveryWorkerNum 函式獲取實際 RecoveryWorker 執行緒數量。
  • RecoverMetaLogWriter 和 RecoverRedoLogDataWriter
    線上恢復(onlineRecovery)過程中,為了避免節點宕機或離線影響恢復過程,會分別通過 RecoverMetaLogWriter 和 RecoverRedoLogDataWriter 寫 recover redoLog 的元資料(Metadata)和資料(data)。與 redoLog 不同的是,recover redoLog 的 Metadata 和 data 需要進行寫磁碟時才能開始 recovery。通過 enableDfsRecoverRedo 配置是否開啟 recover redoLog,預設是開啟。開啟後,在每個資料節點上存在一個相應的執行緒。recover redoLog 的檔案目錄也可以通過 recoverLogDir 配置,預設在節點根目錄下的 log/recoverLog 中。
  • DFSChunkRecoveryWorker
    在控制節點上處理 recovery 任務的執行緒。同時進行 recovery 任務的數量預設為叢集中資料節點個數的2倍,可由 dfsRecoveryConcurrency 配置。

2.5 事務相關

如果叢集中的某個節點在處理事務的過程中宕機了,那麼重啟後僅依靠該節點有可能無法確定事務的最終狀態,需要在叢集中進行事務決議來確定。

  • UnresolvedTransactionReporter
    在資料節點啟動時,如果資料節點自己不能判斷某些事務的狀態,通過該執行緒來向控制節點彙報併發起事務決議,判斷事務最終處於回滾還是完成狀態。該執行緒只有一個,且在所有需要決議的事務決議後結束。
  • DFSTransactionResolutionWorker
    該執行緒處理由資料節點發起的事務決議、控制節點啟動時回放元資料後無法決定狀態的事務或執行時超時未更新狀態的事務。在控制節點上存在一個該執行緒。
  • ChunkNodeLogWriter
    資料節點寫元資料的執行緒。元資料預設在各個資料節點根目錄下的 storage/CHUNK_METADATA 中,可以通過配置項 chunkMetaDir 修改。在資料節點上存在一個該執行緒。
  • EditLogBatchWriter
    控制節點寫元資料的執行緒。由於對控制節點上元資料的修改比較頻繁,所以由該執行緒統一將寫入緩衝區的資料寫入磁碟並同步,同時還對寫元資料失敗的情況進行回滾處理。在控制節點上存在一個該執行緒。

2.6 其他

  • SnapshotEngineWorker
    為減少開啟快照引擎對寫入的影響,而將分散式表資料非同步寫入快照引擎的執行緒。在資料節點上存在一個該執行緒。可以通過函式 registerSnapshotEngine 和 unregisterSnapshotEngine 對一個分散式表註冊和取消註冊快照引擎。
  • DFSChunkRebalanceWorker
    節點間平衡資料或者多塊磁碟間平衡資料的任務,均交由控制節點上的 DFSChunkRebalanceWorker 執行緒處理。在控制節點上存在一個該執行緒。同時發起的資料平衡任務數量預設為叢集中資料節點個數的兩倍。可由 dfsRebalanceConcurrency 配置。通過函式 rebalanceChunksWithinDataNode 和 rebalanceChunksAmongDataNodes 手動觸發節點或磁碟間的資料平衡。

3.流資料

本節通過釋出訂閱、計算引擎和高可用三個模組介紹流資料相關執行緒。這些執行緒都僅在資料節點或單節點上存在。

3.1 釋出訂閱

以下為資料節點上普通流表的訂閱釋出流程中涉及到的執行緒。

  • MessageThrottle
    實現流資料訂閱 throttle 引數功能的執行緒,數量為1。系統每隔一段時間檢查當前節點上是否存在經過 throttle 時間但仍未達到 batchSize 的訂閱(subscribeTable 函式中指定了 batchSize 和 throttle )。如果存在,則觸發一次訂閱的訊息處理。通過 subThrottle 配置觸發檢查的間隔時間,預設值為1000,單位為毫秒。
  • AsynchronousPublisher
    在 AsynchronousPublisher 執行緒中檢查每個釋出節點對每個訂閱節點建立的連線。如果這個連線對應的釋出佇列有更新,就將更新的資料釋出到訂閱端。通過 maxPubConnections 配置釋出節點連線的訂閱節點數量上限,預設值為0,表示不可以作為釋出節點,即不會建立 AsynchronousPublisher 執行緒,大於0時會建立一個該執行緒。
  • AsynchronousSubscriber
    監聽所有的訂閱連線,接收、解析連線上收到的資料,併發送到相應的訂閱訊息佇列。配置了subPort之後,會建立一個該執行緒。 通過 maxSubConnections 配置一個訂閱節點可以連線的釋出節點數量上限,預設值為64。
  • LocalSubscriberImp
    在 LocalSubscriberImp 執行緒中直接檢查有資料更新的本地訂閱,並將符合條件的本地訂閱中的資料傳送到訂閱訊息佇列中。配置 subPort 後,會建立一個該執行緒。
  • StreamExecutor
    StreamExecutor 執行緒從訂閱訊息佇列中取出資料,寫入相應訂閱的 handler 中,同時維護訂閱的偏移量、訊息總數等資訊。每個訂閱訊息佇列對應一個 StreamExecutor 執行緒,數量由配置項 subExecutors 決定,預設值為1,最大不超過 CPU 核數。
  • PersistenceWorker
    以非同步方式持久化的流表會通過 PersistenceWorker 執行緒將資料寫到磁碟上。persistenceWorkerNum 控制持久化執行緒的數量,預設為1。由 persistenceDir 配置開啟持久化的流表的儲存路徑。
  • AsynchronousReconnector
    針對所有設定引數 reconnect=true 的訂閱,系統會在非正常中斷後通過該執行緒嘗試自動重連。在配置了subPort 之後,會建立一個該執行緒。

3.2 計算引擎

建立計算引擎時,若配置瞭如下引數,便會建立兩個執行緒:CheckTimeExecutor 和 SystemTimeExecutor。

  • CheckTimeExecutor
    包括 TimeSeriesCheckTimeExecutor, SessionCheckTimeExecutor, CSEngineCheckTimeExecutor, AsofJoinCheckTimeExecutor 和 LookupJoinCheckTimeExecutor。
    在建立 TimeSeriesEngine 時設定了 updateTime、建立 SessionWindowEngine 時設定了 forceTriggerTime、建立 CrossSectionalEngine 時設定了 triggeringPattern=“interval”、建立 AsofJoinEngine 時設定了 delayedTime、建立 LookupJoinEngine 時設定了 checkTimes,那麼每個引擎就會建立一個 CheckTimeExecutor 執行緒,表示如果經過了引數設定的時間還未觸發計算,則強制觸發一次引擎的計算。
  • SystemTimeExecutor
    包括 TimeSeriesSystemTimeExecutor, SessionSystemTimeExecutor, CrossSectionalEngineExecutor 和 WindowJoinSystemTimeExecutor。
    在建立 TimeSeriesEngine, SessionWindowEngine, CrossSectionalEngine 和 WindowJoinEngine 時,如果設定了 useSystemTime=true,那麼每個引擎就會建立一個 SystemTimeExecutor 執行緒,表示每隔固定的時間觸發一次引擎的計算。

3.3 流資料高可用

配置項 streamingRaftGroups 中每個 group 都會在 group 內的節點上生成下述的三個執行緒。

  • StreamingDataFileWriter
    在 raft 的 leader 節點上向流表寫資料時,要通過該執行緒應用 leader 上寫資料的 entryLog,向流表寫資料。
  • StreamingRaftReplayWorker
    當一個節點成為某個 group 的 leader 時,就會通過該執行緒回放此 group 的 raftLog。
  • StreamingHA::CkptWorker
    為節點上的 raftLog 做 checkpoint 以回收垃圾的執行緒。垃圾回收的間隔可由 streamingHAPurgeInterval 設定,預設值為300,單位是秒。

4.叢集管理

在叢集中控制節點通過心跳監控其他節點的存活狀態。

  • HeartBeatSender
    控制資料節點或計算節點向控制節點每隔0.5秒傳送一次心跳的執行緒。心跳資訊中同時還會彙報節點當前的一些資訊(如 CPU、記憶體、磁碟佔用)給控制節點。在資料節點或計算節點上存在一個該執行緒。 通過 lanCluster 控制心跳採用 udp 或 tcp 協議,當為 true 時使用 udp,false 時使用 tcp,預設值為true。
  • HeartBeatReceiver
    僅當 lanCluster=true 時,在控制節點和資料節點、計算節點上存在的接收 udp 心跳的執行緒。
  • HeartBeatMonitor
    僅在控制節點存在的執行緒。每隔一秒檢查一次是否收到叢集中資料節點或計算節點的心跳資訊。如果一個節點連續3次檢查都沒有心跳,就認為這個節點已經宕機了。 如果資料節點配置了 datanodeRestartInterval(值大於0),那麼當節點宕機時間超過設定值,就會通過 agent 重啟該資料節點。該配置項預設值為0。
  • ServiceMgmtCenter
    僅在控制節點存在的執行緒。當一個代理節點重新上線時,通過該執行緒將公鑰資訊儲存到代理節點上。當一個數據節點重新上線時,會讓其彙報節點的所有 chunk 資訊,並且在資料節點上刪除控制節點上不存在的chunk。

5.控制節點高可用

本節簡述開啟控制節點高可用之後,raft 相關的執行緒。對於每種執行緒,在 raft group 內的每個控制節點上都有且僅有一個。

  • RaftTimer
    負責計時(心跳傳送間隔和發起選舉時間)的執行緒。leader 通過該執行緒每隔一段時間向 follower 傳送心跳資訊,follower 如果一段時間沒有收到 leader 的心跳,將發起選舉。 通過 raftElectionTick 可以設定在 [raftElectionTick, 2*raftElectionTick] 之間的一個隨機時間後未收到 leader 的心跳將發起選舉,預設值為800,單位是10ms。
  • RaftInputWorker
    從輸入訊息佇列取出訊息應用到當前節點的執行緒。
  • RaftOutputWorker
    從輸出訊息佇列取出訊息並應用到相應節點的執行緒。
  • RaftProposeWorker
    處理對 raftLog 讀寫請求的執行緒。
  • SnapshotSender
    將 leader 當前狀態的快照發送給其他節點的執行緒。
  • RaftLeaderSwitchWorker
    執行 raft 節點角色切換的執行緒。
  • DFSRaftReplayWorker
    將記錄的 raftLog 應用到當前節點的執行緒。

6.其他

  • ThreadPoolSocketGroup
    在 server 的埠上監聽收到的訊息請求,並交由相應的工作佇列處理。每個節點有且僅有一個執行緒。
  • BackgroundRoutineService
    server 的後臺執行緒,每個節點會生成 4 個該執行緒。server 會在該執行緒中註冊一些函式,這些函式會在BackgroundRoutineService 執行緒執行過程中每隔一段時間就被呼叫一次。
  • LogWriter
    將節點執行過程中生成的 log 寫入檔案的執行緒。每個節點都有一個該執行緒。
  • StdConsole
    啟動 server 後在命令列視窗接收命令的執行緒。在 server 啟動引數中如果設定 console=true,那麼就會啟動一個該執行緒。