TiFlash 計算層概覽

語言: CN / TW / HK

本文選自《TiDB 6.x in Action》,分為 TiDB 6.x 原理和特性、TiDB Developer 體驗指南、TiDB 6.x 可管理性、TiDB 6.x 核心優化與效能提升、TiDB 6.x 測評、TiDB 6.x 最佳實踐 6 大內容模組,匯聚了 TiDB 6.x 新特性的原理、測評、試用心得等等乾貨。不管你是 DBA 運維還是應用開發者,如果你正在或有意向使用 TiDB 6.x,這本書都可以給你提供參考和實踐指南。

TiFlash 是 TiDB 的分析引擎,是 TiDB HTAP 形態的關鍵元件。TiFlash 原始碼閱讀系列文章將從原始碼層面介紹 TiFlash 的內部實現。主要包括架構的演進,DAGRequest 協議、dag request 在 TiFlash 側的處理流程以及 MPP 基本原理。

本文作者:徐飛,PingCAP 資深研發工程師

背景

img

上圖是一個 TiDB 中 query 執行的示意圖,可以看到在 TiDB 中一個 query 的執行會被分成兩部分,一部分在 TiDB 執行,一部分下推給儲存層(TiFlash/TiKV)執行。本文我們主要關注在 TiFlash 執行的部分。

img

這個是一個 TiDB 的查詢 request 在 TiFlash 內部的基本處理流程,首先 Flash service 會接受到來自 TiDB 的 RPC 請求,然後會從請求裡拿到 TiDB 的 plan,在 TiFlash 中我們稱之為 DAGRequest,拿到 TiDB 的 plan 之後,TiFlash 需要把 TiDB 的 plan 編譯成可以在 TiFlash 中執行的 BlockInputStream,最後在得到 BlockInputStream 之後,TiFlash 就會進入向量化執行的階段。本文要講的 TiFlash 計算層實際上是包含以上四個階段的廣義上的計算層。

TiDB + TiFlash 計算層的演進

首先,我們從 API 的角度來講一下 TiDB + TiFlash 計算層的演進過程:

img

最開始在沒有引入 TiFlash 時,TiDB 是用過 Coprocessor 協議來與儲存層(TiKV)進行互動的,在上圖中,root executors 表示在 TiDB 中單機執行的運算元,cop executors 指下推給 TiKV 執行的運算元。在 TiDB + TiKV 的計算體系中,有如下幾個特點:

  • TiDB 中的運算元是在 TiDB 中單機執行的,計算的擴充套件性受限;

  • TiKV 中的運算元是在 TiKV 中執行的,而且 TiKV 的計算能力是可以隨著 TiKV 節點數的增加而線性擴充套件的;

  • 因為 TiKV 中並沒有 table 的概念,Coprocessor 是以 Region 為單位的,一個 region 一個 coprocessor request;

  • 每個 Coprocessor 都會帶有一個用於 MVCC 讀的 timestamp,在 TiFlash 中我們稱之為 start_ts。

在 TiDB 4.0 中,我們首次引入了 TiFlash:

img

在引入之初,我們基本上就是隻對接了現有的 Coprocessor 協議,可以看出上面這個圖上之前 TiDB + TiKV 的圖其實是一樣的,除了儲存層從 TiKV 變成了 TiFlash。但是本質上講引入 TiFlash 之前 TiDB + TiKV 是一個面向 TP 的系統,TiFlash 在簡單對接 Coprocessor 協議之後,馬上發現了一些對 AP 很不友好的地方,主要有兩點:

  • Coprocessor 是以 region 為單位的,而 TiDB 中預設 region 大小是 96 MB,這樣對於一個 AP 的大表,可能會包含成千上萬個 region,這導致一個 query 就會有成千上萬次 RPC;

  • 每個 Coprocessor 只讀一個 region 的資料,這讓儲存層很多讀相關的優化都用不上。

在發現問題之後,我們嘗試對原始的 Coprocessor 協議進行改進,主要進行了兩次嘗試:

  • BatchCommands:這個是 TiDB + TiKV 體系裡就有的一個改進,原理就是在傳送的時候將傳送給同一個儲存節點的 request batch 成一個,對於 TiFlash 來說,因為只支援 Coprocessor request,所以就是把一些 Coprocessor request batch 成了一個。因為 batch 操作是傳送端最底層做的,所以 batch 在一起的 Coprocessor request 並沒有邏輯上的聯絡,所以 TiFlash 拿到 BatchCoprocessor 之後也就是每個 Coprocessor request 依次處理。所以 BatchCommands 只能解決 RPC 過多的問題。

  • BatchCoprocessor:這個是 TiDB + TiFlash 特有的 RPC,其想法也很簡單,就是對同一個 TiFlash 節點,只發送一個 request,這個 request 裡面包含了所有需要讀取的 region 資訊。顯然這個模式不但能減少 RPC,而且儲存層能一次性的看到所有需要掃描的資料,也讓儲存層有了更大的優化空間。

儘管在引入 BatchCoprocessor 之後,Coprocessor 的兩個主要缺點都得到了解決,但是因為無論是 BatchCoprocessor 還是 Coprocessor 都只是支援對單表的 query,遇到複雜 sql,其大部分工作還是需要在 root executor 上單機執行,以下面這個兩表 join 的 plan 為例:

img

只有 TableScan 和 Selection 部分可以在 TiFlash 中執行,而之後的 Join 和 Agg 都需要在 TiDB 執行,這顯然極大的限制了計算層的擴充套件性。為了從架構層面解決這個問題,在 TiFlash 5.0 中,我們正式引入了 MPP 的計算架構:

img

引入 MPP 之後,TiFlash 支援的 query 部分得到了極大的豐富,對於理想情況下,root executor 直接退化為一個收集結果的 TableReader,剩下部分都會下推給 TiFlash,從而從根本上解決了 TiDB 中計算能力無法橫向擴充套件的問題。

DAGRequest 到 BlockInputStream

在 TiFlash 內部,接收到 TiDB 的 request 之後,首先會得到 TiDB 的 plan,在 TiFlash 中,稱之為 DAGRequest,它是一個基於 protobuf 協議的一個定義,一些主要的部分如下:

img

值得一提的就是 DAGRequest 中有兩個 executor 相關的 field:

  • executors:這個是引入 TiFlash 之前的定義,其表示一個 executor 的陣列,而且裡面的 executor 最多就三個:一個 scan(tablescan 或者 indexscan),一個 selection,最後一個 agg/topN/limit;

  • root_executors:顯然上面那個 executors 的定義過於簡單,無法描述 MPP 時的 plan,所以在引入 MPP 之後我們加了一個 root_executor 的 field,它是一個 executor 的 tree。

在得到 executor tree 之後,TiFlash 會進行編譯,在編譯的時候有一箇中間資料結構是 DAGQueryBlock,TiDB 會先將 executor tree 轉成 DAGQueryBlock 的tree,然後對 DAGQueryBlock 的 tree 進行後序遍歷來編譯。

DAGQueryBlock 的定義和原始的 executor 陣列很類似,一個 DAGQueryBlock 包含的 executor 如下:

SourceExecutor Selection Having

其中 SourceExecutor 包含真正的 source executor 比如 tablescan 或者 exchange receiver,以及其他所有不符合上述 executor 陣列 pattern 的 executor,如 join,project 等。

可以看出來 DAGQueryBlock 是從 Coprocessor 時代的 executor 陣列發展而來的,這個結構本身並沒有太多的意義,而且也會影響很多潛在的優化,在不久的將來,應該會被移除掉。

在編譯過程中,有兩個 TiDB 體系特有的問題需要解決:

  • 如何保證 TiFlash 的資料與 TiKV 的資料保持強一致性;

  • 如何處理 Region error。

對於第一個問題,我們引入了 Learner read 的過程,即在 TiFlash 編譯 tablescan 之前,會用 start_ts 向 raft leader 查詢截止到該 start_ts 時,raft 的 index 是多少,在得到該 index 之後,TiFlash 會等自己這個 raft leaner 的 index 追上 leader 的 index。

對於第二個問題,我們引入了 Remote reader 的概念,即如果 TiFlash 遇到了 region error,那麼如果是 BatchCoprocessor 和 MPP request,那 TiFlash 會主動向其他 TiFlash 節點發 Coprocessor request 來拿到該 region 的資料。

在把 DAGRequest 編譯成 BlockInputStream 之後,就進入了向量化執行的階段,在向量化執行的時候,有兩個基本的概念:

  • Block:是執行期的最小資料單元,它由一個 column 的陣列組成;

  • BlockInputStream:相當於執行框架,每個 BlockInputStream 都有一個或者多個 child,執行時採用了 pull 的模型。下面是執行時的虛擬碼:

img

BlockInputStream 可以分為兩類:

  • 用於做計算的,例如:

    • DMSegmentThreadInputStream:與儲存互動的 InpuStream,可以簡單理解為是 table scan;

    • ExchangeReceiverInputStream:從遠端讀資料的 InputStream;

    • ExpressionBlockInputStream:進行 expression 計算的 InputStream;

    • FilterBlockInputStream:對資料進行過濾的 InputStream;

    • ParallelAggregatingBlockInputStream:做資料進行聚合的 InputStream。

  • 用於併發控制的,例如:

    • UnionBlockInputStream:把多個 InputStream 合成一個 InputStream;

    • ParallelAggregatingBlockInputStream:和 Union 類似,不過還會做一個額外的資料聚合;

    • SharedQueryBlockInputStream:把一個 InputStream 擴散成多個 InputStream。

img

用於計算的 InputStream 與用於併發控制的 InputStream 最大的不同在於用於計算的 InputStream 自己不管理執行緒,它們只負責在某個執行緒裡跑起來,而用於併發控制的 InputStream 會自己管理執行緒,如上所示,Union,ParallelAggregating 以及 SharedQuery 都會在自己內部維護一個執行緒池。當然有些併發控制的 InputStream 自己也會完成一些計算,比如 ParallelAggregatingBlockInputStream。

MPP

在介紹完 TiFlash 計算層中基本的編譯以及執行框架之後,我們重點再介紹下 MPP。

MPP 在 API 層共有三個:

  • DispatchMPPTask:用於 TiDB 向 TiFlash 傳送 plan;

  • EstablishMPPConnectionSyncOrAsync:用於 MPP 中上游 task 向下遊 task 發起讀資料的請求,因為無論是讀的資料量以及讀的時間會比較長,所以這個 RPC 是 streaming 的 RPC;

  • CancelMPPTask:用於 TiDB 端 cancel MPP query。

在執行 MPP query 的時候,首先由 TiDB 生成 MPP task,TiDB 用 DispatchMPPTask 來將 task 分發給各個 TiFlash 節點,然後 TiDB 與 TiFlash 會用 EstablishMPPConnection 來建立起各個 task 之間的連線。

與 BatchCoprocessor 相比,MPP 的核心概念是 Exchange,用於 TiFlash 節點之間的資料交換,在 TiFlash 中有三種 exchange 的型別:

  • Broadcast:即將一份資料 broadcast 到多個目標 mpp task;

  • HashPartition:即將一份資料用 hash partition 的方式切分成多個 partition,然後傳送給目標 mpp task;

  • PassThrough:這個與 broadcast 幾乎一樣,不過 PassThrough 的目標 task 只能有一個,通常用於 MPP task 給 TiDB 返回結果。

img

上圖是 Exchange 過程中的一些關鍵資料結構,主要有如下幾個:

  • 接收端

    • ExchangeReceiver:用於向其他 task 建立連線,接收資料並放在 result queue;

    • ExchangeReceiverInputStream:執行框架中的一個 InputStream,多個 ER Stream 共同持有一個 ExchangeReceiver,並從其 result queue 中讀資料。

  • 傳送端

    • MPPTunnel:持有 grpc streaming writer,用於將計算結果傳送給其他 task,目前有三種模式:

      • Sync Tunnel:用 sync grpc 實現的 tunnel;

      • Async Tunnel:用 async grpc 實現的 tunnel;

      • Local Tunnel:對於處於同一個節點的不同 task,他們之間的 Tunnel 不走 RPC,在記憶體裡傳輸資料即可。

    • MPPTunnelSet:同一個 ExchangeSender 可能需要向多個 mpp task 傳輸資料,所以會有多個 MPPTunnel,這些 MPPTunnel 在一起組成一個 MPPTunnelSet。

    • StreamingDAGResponseWriter:持有 MPPTunnelSet,主要做一些傳送之前的資料預處理工作:

      • 將資料 encode 成協議規定的格式;

      • 如果 Exchange Type 是 HashPartition 的話,還需要負責把資料進行 Hash partition 的切分。

    • ExchangeSenderBlockInputStream:執行框架中的一個 InputStream,持有 StreamingDAGResponseWriter,把計算的結果傳送給 writer。

除了 Exchange,MPP 還有一個重要部分是 MPP task 的管理,與 BatchCoprocessor/Coprocessor 不同,MPP query 的多個 task 需要有一定的通訊協作,所以 TiFlash 中需要有對 MPP task 的管理模組。其主要的資料結構如下:

  • MPPTaskManager:全域性的 instance 用來管理該 TiFlash 節點上所有的 MPP task;

  • MPPQueryTaskSet:屬於同一個 query 的所有 MPP task 集合,在諸如 CancelMPPTask 時用於快速找到所有的目標 task;

  • MPPTask:一個 MPP query 中的最基本單元,不同 MPP task 之間通過 Exchange 來交換資料。

以上就是 TiFlash 中 MPP 的相關實現,可以看出目前這個實現還是比較樸素的。在隨後的測試和使用中,我們很快發現一些問題,主要有兩個問題:

第一個問題:對於一些 sql 本身很複雜,但是資料量(計算量)卻不大的 query,我們發現,無論怎麼增加 query 的併發,TiFlash 的 cpu 利用率始終會在 50% 以下。經過一系列的研究之後我們發現 root cause 是目前 TiFlash 的執行緒使用是需要時申請,結束之後即釋放的模式,而頻繁的執行緒申請與釋放效率非常低,直接導致了系統 cpu 使用率無法超過 50%。解決該問題的直接思路即使用執行緒池,但是由於我們目前 task 使用執行緒的模式是非搶佔的,所以對於固定大小的執行緒池,因為系統中沒有全域性的排程器,會有死鎖的風險,為此我們引入了 DynamicThreadPool,在該執行緒池中,執行緒主要分為兩類:

  • 固定執行緒:長期存在的執行緒

  • 動態執行緒:按需申請的執行緒,不過與之前的執行緒不同的是,該執行緒在結束當前任務之後會等一段時間,如果沒有新的任務的話,才會退出

第二個問題和第一個問題類似,也是執行緒相關的,即 TiFlash 在遇到高併發的 query 時,因為執行緒使用沒有很好的控制,會導致 TiFlash server 遇到無法分配出執行緒的問題,為了解決此問題,我們必須控制 TiFlash 中同時使用的執行緒,在跑 MPP query 的時候,執行緒主要可以分為兩部分:

  • 完全分散式的排程器,僅依賴 TiFlash 節點自身的資訊

  • 基本的原理為 MinTSOScheduer 保證 TiFlash 節點上最小的 start_ts 對應的所有 MPP task 能正常執行。因為全域性最小的 start_ts 在各個節點上必然也是最小的 start_ts,所以 MinTSOScheduer 能夠保證全域性至少有一條 query 能順利執行從而保證整個系統不會有死鎖,而對於非最小 start_ts 的 MPP task,則根據當前系統的執行緒使用情況來決定是否可以執行,所以也能達到控制系統執行緒使用量的目的。

IO 執行緒:主要指用於 grpc 通訊的執行緒,在減小 grpc 執行緒使用方面,我們基本上是採用了業界的成熟方案,即用 async 的方式,我們實現了 async 的 grpc server 和 async 的 grpc client,大大減小了 IO 執行緒的使用量

計算執行緒:為了控制計算執行緒,我們必須引入排程器,該排程器有兩個最低目標:不造成死鎖以及最大程度控制系統的執行緒使用量,最後我們在 TiFlash 裡引入了 MinTSOScheduer:

  • 完全分散式的排程器,僅依賴 TiFlash 節點自身的資訊

  • 基本的原理為 MinTSOScheduer 保證 TiFlash 節點上最小的 start_ts 對應的所有 MPP task 能正常執行。因為全域性最小的 start_ts 在各個節點上必然也是最小的 start_ts,所以 MinTSOScheduer 能夠保證全域性至少有一條 query 能順利執行從而保證整個系統不會有死鎖,而對於非最小 start_ts 的 MPP task,則根據當前系統的執行緒使用情況來決定是否可以執行,所以也能達到控制系統執行緒使用量的目的。

總結

本文主要系統性地介紹了 TiFlash 計算層的基本概念,包括架構的演進,TiFlash 內部對 TiDB plan 的處理以及 MPP 基本原理等,以期望讀者能夠對 TiFlash 計算層有一個初步的瞭解。後續還會有一些具體實現諸如 TiFlash 表示式以及算子系統的細節介紹,敬請期待。