TiFlash 原始碼閱讀(九)TiFlash 中常用運算元的設計與實現

語言: CN / TW / HK

TiFlash 是 TiDB 的分析引擎,是 TiDB HTAP 形態的關鍵元件,TiFlash 原始碼閱讀系列文章將從原始碼層面介紹 TiFlash 的內部實現。在上一期原始碼閱讀 ,我們介紹了 TiFlash 表示式的實現與設計,包括 TiFlash 表示式的基本概念:表示式體系,標量函式、聚合函式等。

本文主要介紹了資料庫系統中常用的運算元 Join 和 Aggregation 在 TiFlash 中的執行情況,包括查詢計劃生成、編譯階段與執行階段,以期望讀者對 TiFlash 的運算元有初步的瞭解。

本文作者:齊智 , TiFlash 研發 工程師。

運算元概要

在閱讀本文之前,推薦閱讀本系列的前作: TiFlash 原始碼閱讀(二)計算層概覽 ,以對 TiFlash 計算層、MPP 框架有一定了解。

在資料庫系統中,運算元是執行 SQL 主要邏輯的地方。一條 SQL 會被 parser 解析為一棵運算元樹(查詢計劃),然後經過 optimizer 的優化,再交給對應的 executor 執行,如下圖所示。

本文的主要內容包括

  1. TiDB 如何生成與優化 MPP 運算元與查詢計劃

  2. Join 運算元在 TiFlash 中的編譯(編譯指的是將 TiDB-server 下發的執行計劃片段生成可執行結構的過程,下同)與執行

  3. Aggregation 運算元在 TiFlash 中的編譯與執行

構建查詢計劃

一些背景知識:

  1. 邏輯計劃與物理計劃:可以簡單理解為邏輯計劃是指運算元要做什麼,物理計劃是指運算元怎樣去做這件事。比如,“將資料從表 a 和表 b 中讀取出來,然後做 join”描述的是邏輯計劃;而“在 TiFlash 中做 shuffle hash join” 描述的是物理計劃。更多資訊可以參閱: TiDB 原始碼閱讀系列文章

  2. MPP:大規模平行計算,一般用來描述節點間可以交換資料的平行計算,在當前版本(6.1.0,下同)的 TiDB 中,MPP 運算都發生在 TiFlash 節點上。推薦觀看: 原始碼解讀 - TiFlash 計算層 overview 。MPP 是物理計劃級別的概念。

MPP 計劃

在 TiDB 中,可以在 SQL 前加上 explain 來檢視這條 SQL 的查詢計劃,如下圖所示,是一棵由物理運算元組成的樹,可以檢視 TiDB 執行計劃概覽 (複製連結至瀏覽器檢視: http://docs.pingcap.com/zh/tidb/stable/explain-overview ) 來對其有更多的瞭解。

MPP 查詢計劃的獨特之處在於查詢計劃中多出了用於進行資料交換的 ExchangeSender 和 ExchangeReceiver 運算元。

執行計劃中會有這樣的 pattern,代表將會在此處進行資料傳輸與交換。

     ...
|_ExchangeReceiver_xx
|_ ExchangeSender_xx

每個 ExchangeSender 都會有一個 ExchangeType,來標識本次資料傳輸的類別,包括:

  1. HashPartition,將資料按 Hash 值進行分割槽之後分發到上游節點。

  2. Broadcast,將自身資料拷貝若干份,廣播到所有上游節點中。

  3. PassThrough,將自己的資料全部傳給一個指定節點,此時接收方可以是 TiFlash 節點(ExchangeReceiver);也可以是 TiDB-server 節點(TableReader),代表 MPP 運算完畢,向 TiDB-server 返回資料。

在上面的查詢計劃圖中,一共有三個 ExchangeSender,id 分別是 19, 13 和 17。其中 ExchangeSender_13 和 ExchangeSender_17 都是將讀入後的資料按雜湊值 shuffle 到所有節點中,以便進行 join,而 ExchangeSender_19 則是將 join 完成後的資料返回到 TiDB-server 節點中。

新增 Exchange

在優化器的計劃探索過程中,會有兩處為查詢計劃樹插入 Exchange 運算元:

  1. 一個是 MPP 計劃在探索完畢後,接入 TiDB 的 tableReader 時。型別為 passThrough type. 原始碼在函式 func (t *mppTask) convertToRootTaskImpl

  2. 一個是 MPP 計劃在探索過程中,發現當前運算元的 property(這裡主要指分割槽屬性)不滿足上層要求時。例如上層要求需要按 a 列的 hash 值分割槽,但是下層運算元不能滿足這個要求,就會插入一組 Exchange.

func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
if !t.needEnforceExchanger(prop) {
return t
}
return t.copy().(*mppTask).enforceExchangerImpl(prop)
}

// t.partTp 表示當前運算元已有的 partition type,prop 表示父運算元要求的 partition type
func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool {
switch prop.MPPPartitionTp {
case property.AnyType:
return false
case property.BroadcastType:
return true
case property.SinglePartitionType:
return t.partTp != property.SinglePartitionType
default:
if t.partTp != property.HashType {
return true
}
if len(prop.MPPPartitionCols) != len(t.hashCols) {
return true
}
for i, col := range prop.MPPPartitionCols {
if !col.Equal(t.hashCols[i]) {
return true
}
}
return false
}
}

Property 對於分割槽屬性的要求(MPPPartitionTp)有以下幾種:

  1. AnyType,對下層運算元沒有要求,所以並不需要新增 exchange;

  2. BroadcastType,用於 broadcast join,要求下層節點複製資料並廣播到所有節點中,此時一定需要新增一個 broadcast exchange;

  3. SinglePartitionType,要求下層節點將資料彙總到同一臺節點中,此時如果已經在同一臺節點上,則不用再進行 exchange。

  4. HashType,要求下層節點按特定列的雜湊值進行分割槽,如果已經按要求分好區了,則不用再進行 exchange.

在優化器的生成查詢計劃的探索中,每個運算元都會對下層有 property 要求,同時也需要滿足上層傳下來的 property;當上下兩層的 property 無法匹配時,就插入一個 exchange 運算元交換資料。依靠這些 property,可以不重不漏的插入 exchange 運算元。

MPP 演算法

是否選擇 MPP 演算法是在 TiDB 優化器 生成物理計劃 時決定,即 CBO(Cost-Based Optimization) 階段。優化器會遍歷所有可選擇的計劃路徑,包括含有 MPP 演算法的計劃與不含有 MPP 演算法的計劃,估計它們的代價,並選擇其中總代價最小的一個查詢計劃。

對於當前的 TiDB repo 程式碼,有四個位置可以觸發 MPP 計劃的生成,分別對應於 join、agg、window function、projection 四個運算元:

  1. func (p *LogicalJoin) tryToGetMppHashJoin

  2. func (la *LogicalAggregation) tryToGetMppHashAggs

  3. func (lw *LogicalWindow) tryToGetMppWindows

  4. func (p *LogicalProjection) exhaustPhysicalPlans

這裡只描述具有代表性的 join 和 agg 運算元,其他運算元同理。

Join

當前 TiDB 支援兩種 MPP Join 演算法,分別是:

  • Shuffle Hash Join,將兩張表的資料各自按 hash key 分割槽後 shuffle 到各個節點上,然後做 hash join,如上一節中舉出的查詢計劃圖所示。

  • Broadcast Join,將小表廣播到大表所在的每個節點,然後做 hash join,如下圖所示。

tryToGetMppHashJoin 函式在構建 join 運算元時給出了對子運算元的 property 要求:

if useBCJ { // broadcastJoin

childrenProps[buildside] = {MPPPartitionTp: BroadcastType}
childrenProps[1-buildside] = {MPPPartitionTp: AnyType}

} else { // shuffle hash join

childrenProps[0] = {MPPPartitionTp: HashType, key: leftKeys}
childrenProps[1] = {MPPPartitionTp: HashType, key: rightKeys}

}

如程式碼所示,broadcast join 要求 buildside(這裡指要廣播的小表)具有一個 BroadcastType 的 property,對大表側則沒有要求。而 shuffle hash join 則要求兩側都具有 HashType 的分割槽屬性,分割槽列分別是 left keys 和 right keys。

Aggregation

當前 tryToGetMppHashAggs 可能生成三種 MPP Aggregation 計劃:

  1. “一階段 agg”,要求資料先按 group by key 分割槽,然後再進行聚合。

2. “兩階段 agg”,首先在本地節點進行第一階段聚合,然後按 group by key 分割槽,再進行一次聚合(用 sum 彙總結果)。

3. “scalar agg”,沒有分割槽列的特定情況,在本地節點進行第一階段聚合,然後彙總到同一臺節點上完成第二階段聚合。

一階段 agg 和兩階段 agg 的區別是是否先在本地節點做一次預聚合,優化器會根據 SQL 與代價估算來選擇執行哪種方式。對於重複值很多的情況,兩階段 agg 可以在網路傳輸前減少很多資料量,從而減少大量的網路消耗;而如果重複值很少的情況下,這次預聚合並不會減少很多資料量,反而白白增大了 cpu 與記憶體消耗,此時就不如使用一階段 agg。

這裡留一個小思考題,這三種 agg 各自對下方有什麼 property 要求?在聚合做完之後又滿足了怎樣的 property?

答案是:

一階段 agg 要求 hash,做完滿足 hash;二階段 agg 無要求,做完滿足 hash;scalar agg 無要求,做完滿足 singlePartition.

編譯與執行

執行計劃構建好之後,TiDB-server 會將 dag(執行計劃的片段)下發給對應的 TiFlash 節點。在 TiFlash 節點中,需要首先解析這些執行計劃,這個過程我們稱作“編譯”,編譯的結果是 BlockInputStream,它是 TiFlash 中的可執行結構;而最後一步就是在 TiFlash 中執行這些 BlockInputStream.

下圖是一個 BlockInputStream DAG 的例子,每個 BlockInputStream 都有三個方法:readPrefix, read 和 readSuffix;類似於其他火山模型呼叫 open、next 和 close。

下圖的來源是 TiFlash 執行器執行緒模型複製連結至瀏覽器檢視: http://zhuanlan.zhihu.com/p/500254430 ,關於執行模型更多的內容,可以參考這篇文章或者 TiFlash Overview,這裡不再贅述。

Join 的編譯與執行

TiDB-server 節點會將查詢計劃按 Exchange 會作為分界,將查詢切分為不同的計劃片段(task),作為 dag 發給 TiFlash 節點。比如對於下圖中所示的查詢計劃,會切分為這三個紅框。

TiFlash 節點在編譯完成後生成的 BlockInputStream 如下,可以在 debug 日誌中看到:

task 1
ExchangeSender
Expression: <final projection>
Expression: <projection after push down filter>
Filter: <push down filter>
DeltaMergeSegmentThread

task 2
ExchangeSender
Expression: <final projection>
Expression: <projection after push down filter>
Filter: <push down filter>
DeltaMergeSegmentThread

task 3
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_15>, join_kind = Inner
Expression: <append join key and join filters for build side>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_34>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}

其中 task1 和 task2 是將資料從儲存層讀出,經過簡單的處理之後,發給 ExchangeSender. 在 task3 中,有三個 BlockInpuStream 值得關注,分別是:CreatingSets, HashJoinBuild, HashJoinProbe.

CreatingSetsBlockInputStream

接受一個數據 BlockInputStream 表示 joinProbe,還有若干個代表 JoinBuild 的 Subquery。CreatingSets 會併發啟動這些 Subquery, 等待他們執行結束後在開始啟動資料 InputStream. 下面兩張圖分別是 CreatingSets 的 readPrefix 和 read 函式的呼叫棧。

為什麼 CreatingSets 可能同時建立多張雜湊表?因為在一個多表 join 中,同一個計劃片段可能緊接著做多次 join porbe,如下圖所示:

task:4
CreatingSets
Union x 2: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_22>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_50>
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_14>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}

Join Build

注意,join 在此處僅代表 hash join,已經與網路通訊和 MPP 級別的演算法無關。

關於 join 的程式碼都在 dbms/src/Interpreters/Join.cpp 中;我們以下面兩張表進行 join 為例來說明:

left_table l join right_table r 
on l.join_key=r.join_key
where l.b>=r.c

預設右表做 build 端,左表做 probe 端。雜湊表的值使用鏈式儲存:

Join Probe

這裡主要描述的是 JoinBlockImpl 這個函式的流程:

  1. block 包含了左表的內容;建立 added_columns, 即要新增到 block 中的右表的列;然後建立相應的過濾器 replicate_offsets:表示當前共匹配了幾行,之後可以用於篩選未匹配上的行,或複製匹配了多行的行。

2. 依次查詢雜湊表,根據查詢結果呼叫相應的 addFound 或 addNotFound 函式,填充 added_columns 和過濾器。

從填充的過程中也可以看到,replicate_offsets 左表表示到當前行為止,一共能匹配上的右表的行數。並且 replicate_offsets[i] - replicate_offsets[i-1] 就表示左表第 i 行匹配到的右表的行數。

3. 將 added_column 直接拼接到 block 上,此時會有短暫的 block 行數不一致。

4. 根據過濾器的內容,複製或過濾掉原先左表中的行。

5. 最後在 block 上處理 other condition,則得到了 join 的結果。

上文中描述的是對於正常的 “all” join 的情況,需要返回左右表的資料。與之相對的則是 “any” join,表示半連線,無需返回右表,只需返回左表的資料,則無需使用 replicate_offsets 這個輔助陣列,讀者可以自行閱讀程式碼。仍然在 dbms/src/intepreters/Join.cpp 中。

Aggregation 的編譯與執行

還是以一個查詢計劃以及對應的 BlockInputStream 為例:

task:1
ExchangeSender
Expression: <final projection>
Expression: <before order and select>
Aggregating
Concat
Expression: <before aggregation>
Expression: <projection>
Expression: <before projection>
Expression: <final projection>
DeltaMergeSegmentThread

task:2
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <projection>
Expression: <before projection>
Expression: <final projection>
SharedQuery: <restore concurrency>
ParallelAggregating, max_threads: 20, final: true
Expression x 20: <before aggregation>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Int64>, <exchange_receiver_1, Nullable(Int64)>}

從查詢計劃中可以看到這是一個兩階段 agg,第一階段對應 task1,執行聚合的 BlockInputStream 是 Aggregating。第二階段對應 task2,執行聚合的 BlockInputStream 是 ParallelAggragating。兩個 task 通過 Exchange 進行網路資料傳輸。

在 aggregation 的編譯期,會檢查當前 pipeline 能夠提供的並行度,如果只有 1,則使用 AggregatingBlockInputStream 單執行緒執行,如果大於 1 則使用 ParallelAggragating 並行執行。

DAGQueryBlockInterpreter::executeAggregation(){
if (pipeline.streams.size() > 1){
ParallelAggregatingBlockInputStream
}else {
AggregatingBlockInputStream
}
}

AggregatingBlockInputStream 的呼叫棧如下:

ParallelAggregatingBlockInputStream 內部會分兩階段操作(這裡的兩階段是內部執行中的概念,發生在同一臺節點上,和查詢計劃中的兩階段不是一個概念)。partial 階段分別在 N 個執行緒構建 HashTable,merge 階段則將 N 個 HashTable 合併起來,對外輸出一個流。呼叫棧如下:

如果 result 是空,那麼會單獨呼叫一次 executeOnBlock 方法,來生成一個預設資料,類似於 count() 沒有輸入時,會返回一個 0.

兩種執行方式都用到了 Aggregator 的 executeOnBlock 方法和 mergeAndConvertToBlocks 方法,他們的呼叫棧如圖所示。前者是實際執行聚合函式的地方,會呼叫聚合函式的 add 方法,將資料值加入;後者的主要目的是將 ParallelAggregating 並行生成的雜湊表合併。

:bulb: 

點選文末 【閱讀原文】 立即下載試用 TiDB!