袋鼠雲數棧基於CBO在Spark SQL優化上的探索

語言: CN / TW / HK

原文連結:袋鼠雲數棧基於CBO在Spark SQL優化上的探索

一、Spark SQL CBO選型背景

Spark SQL的優化器有兩種優化方式:一種是基於規則的優化方式(Rule-Based Optimizer,簡稱為RBO);另一種是基於代價的優化方式(Cost-Based Optimizer,簡稱為CBO)。

1、RBO是傳統的SQL優化技術

RBO是發展比較早且比較成熟的一項SQL優化技術,它按照制定好的一系列優化規則對SQL語法表示式進行轉換,最終生成一個最優的執行計劃。RBO屬於一種經驗式的優化方法,嚴格按照既定的規則順序進行匹配,所以不同的SQL寫法直接決定執行效率不同。且RBO對資料不敏感,在表大小固定的情況下,無論中間結果資料怎麼變化,只要SQL保持不變,生成的執行計劃就都是固定的。

2、CBO是RBO改進演化的優化方式

CBO是對RBO改進演化的優化方式,它能根據優化規則對關係表示式進行轉換,生成多個執行計劃,在根據統計資訊(Statistics)和代價模型(Cost Model)計算得出代價最小的物理執行計劃。

3、 CBO與RBO優勢對比

● RBO優化例子

下面我們來看一個例子:計算t1表(大小為:2G)和t2表(大小為:1.8G)join後的行數 file

上圖是:

SELECT COUNT(t1.id) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t1.age > 24

基於RBO優化後生成的物理執行計劃圖。在圖中我們可以看出,執行計劃最後是選用SortMergeJoin ⑴ 進行兩個表join的。

在Spark中,join的實現有三種:

1.Broadcast Join

2.ShuffleHash Join

3.SortMerge Join

ShuffleHash Join和SortMerge Join都需要shuffle,相對Broadcast Join來說代價要大很多,如果選用Broadcast Join則需要滿足有一張表的大小是小於等於

spark.sql.autoBroadcastJoinThreshold 的大小(預設為10M)。

而我們再看,上圖的執行計劃t1表,原表大小2G過濾後10M,t2表原表大小1.8G過濾後1.5G。這說明RBO優化器不關心中間資料的變化,僅根據原表大小進行join的選擇了SortMergeJoin作為最終的join,顯然這得到的執行計劃不是最優的。

● CBO優化例子

而使用CBO優化器得到的執行計劃圖如下: file

我們不難看出,CBO優化器充分考慮到中間結果,感知到中間結果的變化滿足能Broadcast Join的條件,所以生成的最終執行計劃會選擇Broadcast Join來進行兩個表join。

● 其他優勢

其實除了刻板的執行導致不能得到最優解的問題,RBO還有學習成本高的問題:開發人員需要熟悉大部分優化規則,否則寫出來的SQL效能可能會很差。

● CBO是數棧Spark SQL 優化的更佳選擇

相對於RBO,CBO無疑是更好的選擇,它使Spark SQL的效能提升上了一個新臺階,Spark作為數棧平臺底層非常重要的元件之一,承載著離線開發平臺上大部分任務,做好Spark的優化也將推動著數棧在使用上更加高效易用。所以數棧選擇CBO做研究探索,由此進一步提高數棧產品效能。

二、Spark SQL CBO實現原理

Spark SQL中實現CBO的步驟分為兩大部分,第一部分是統計資訊收集,第二部分是成本估算:

1、統計資訊收集

統計資訊收集分為兩個部分:第一部分是原始表資訊統計、第二部分是中間運算元的資訊統計。

1)原始表資訊統計

Spark中,通過增加新的SQL語法ANALYZE TABLE來用於統計原始表資訊。原始表統計資訊分為表級別和列級別兩大類,具體的執行如下所示:

● 表級別統計資訊

通過執行 ANALYZE TABLE table_name COMPUTE STATISTICS 語句來收集,統計指標包括estimatedSize解壓後資料的大小、rowCount資料總條數等。

● 列級別統計資訊

通過執行 ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column-name1, column-name2, …. 語句來收集。

列級別的資訊又分為基本列資訊和直方圖,基本列資訊包括列型別、Max、Min、number of nulls, number of distinct values, max column length, average column length等,直方圖描述了資料的分佈。Spark預設沒有開啟直方圖統計,需要額外設定引數:spark.sql.statistics.histogram.enabled = true。

原始表的資訊統計相對簡單,推算中間節點的統計資訊相對就複雜一些,並且不同的運算元會有不同的推算規則,在Spark中運算元有很多,有興趣的同學可以看Spark SQL CBO設計文件:

https://issues.apache.org/jira/secure/attachment/12823839/Spark_CBO_Design_Spec.pdf

2)中間運算元的資訊統計

我們這裡以常見的filter運算元為例,看看推算運算元統計資訊的過程。基於上一節的SQL SELECT COUNT(t1.id) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t1.age > 24生成的語法樹來看下t1表中包含大於運算子 filter節點的統計資訊。 圖片

在這裡需要分三種情況考慮:

第一種

過濾條件常數值大於max(t1.age),返回結果為0;

第二種

過濾條件常數值小於min(t1.age),則全部返回;

第三種

過濾條件常數介於min(t1.age)和max(t1.age)之間,當沒有開啟直方圖時過濾後統計資訊的公式為after_filter = (max(t1.age) - 過濾條件常數24)/(max(t1.age) – min(t1.age)) * before_filter,沒有開啟直方圖則預設任務資料分佈是均勻的;當開啟直方圖時過濾後統計資訊公式為after_filter = height(>24) / height(All) * before_filter。然後將該節點min(t1.age)等於過濾條件常數24。

2、成本估算

介紹完如何統計原始表的統計資訊和如何計算中間運算元的統計資訊,有了這些資訊後就可以計算每個節點的代價成本了。

在介紹如何計算節點成本之前我們先介紹一些成本引數的含義,如下:

Hr: 從 HDFS 讀取 1 個位元組的成本
	
	Hw: 從 HDFS 寫1 個位元組的成本

NEt: 在 Spark 叢集中通過網路從任何節點傳輸 1 個位元組到 目標節點的平均成本

Tr: 資料總條數

Tsz: 資料平均大小

CPUc: CPU 成本

計算節點成本會從IO和CPU兩個維度考慮,每個運算元成本的計算規則不一樣,我們通過join運算元來舉例說明如何計算運算元的成本:

假設join是Broadcast Join,大表分佈在n個節點上,那麼CPU代價和IO代價計算公式分別如下:

CPU Cost=小表構建Hash Table的成本 + 大表探測的成本 = Tr(Rsmall) * CPUc + (Tr(R1) + Tr(R2) + … + Tr(Rn)) * n * CPUc

IO Cost =讀取小表的成本 + 小表廣播的成本 + 讀取大表的成本 = Tr(Rsmall) * Tsz(Rsmall) * Hr + n * Tr(Rsmall) * Tsz(Rsmall) * NEt + (Tr(R1)* Tsz(R1) + … + Tr(Rn) * Tsz(Rn)) * Hr

但是無論哪種運算元,成本計算都和參與的資料總條數、資料平均大小等因素直接相關,這也是為什麼在這之前要先介紹如何統計原表資訊和推算中間運算元的統計資訊。

每個運算元根據定義的規則計算出成本,每個運算元成本相加便是整個執行計劃的總成本,在這裡我們可以考慮一個問題,最優執行計劃是列舉每個執行計劃一個個算出每個的總成本得出來的嗎?顯然不是的,如果每個執行計劃都計算一次總代價,那估計黃花菜都要涼了,Spark巧妙的使用了動態規劃的思想,快速得出了最優的執行計劃。

三、數棧在Spark SQL CBO上的探索

瞭解完Spark SQL CBO的實現原理之後,我們來思考一下第一個問題:大資料平臺想要實現支援Spark SQL CBO優化的話,需要做些什麼?

在前文實現原理中我們提到,Spark SQL CBO的實現分為兩步,第一步是統計資訊收集,第二步是成本估算。而統計資訊收集又分為兩步:第一步的原始表資訊統計、第二步中間運算元的資訊統計。到這裡我們找到了第一個問題的答案:平臺中需要先有原始表資訊統計的功能。

第一個問題解決後,我們需要思考第二個問題:什麼時候進行表資訊統計比較合適?針對這個問題,我們初步設想了三種解決資訊統計的方案:

● 在每次SQL查詢前,先進行一次表資訊統計

這種方式得到的統計資訊比較準確,經過CBO優化後得出的執行計劃也是最優的,但是資訊統計的代價最大。

● 定期重新整理表統計資訊

每次SQL查詢前不需要進行表資訊統計,因為業務資料更新的不確定性,所以這種方式進行SQL查詢時得到的表統計資訊可能不是最新的,那麼CBO優化後得到的執行計劃有可能不是最優的。

● 在變更資料的業務方執行資訊統計

這種方式對於資訊統計的代價是最小的,也能保證CBO優化得到的執行計劃是最優的,但是對於業務程式碼的侵入性是最大的。

不難看出三種方案各有利弊,所以進行表資訊統計的具體方案取決於平臺本身的架構設計。

基於數棧平臺建設數倉的結構圖如下圖所示: file

首先通過ChunJun將業務資料庫資料採集到Hive ODS層

然後通過Hive或者Spark進行資料處理

最後通過ChunJun將Hive庫的資料寫入到業務資料庫用於業務處理

從結構圖可看出數棧有用到Hive、Spark和ChunJun三個元件,並且這三個元件都會讀寫Hive, 數棧多個子產品(如離線平臺和實時平臺)也都有可能對Hive進行讀寫,所以如果基於方案3來做成本是非常高的。

方案1本身代價就已經較大,每次查詢前都進行一次資訊統計,資訊統計的時間是要算在本次查詢耗時中的,如果表資料量比較大增加的時間可能是十幾分鍾甚至更久。

綜合考慮,我們選用了更靈活合理的方案2來進行表資訊統計。雖然Spark SQL執行時得到的統計資訊可能不是最新的,但是總體相比較RBO來說還是有很大的效能提升。

接下來就為大家分享,數棧是如何如何統計收集原表資訊統計:

我們在離線平臺專案管理頁面上添加了表資訊統計功能,保證了每個專案可以根據專案本身情況配置不同的觸發策略。觸發策略可配置按天或者按小時觸發,按天觸發支援配置到從當天的某一時刻觸發,從而避開業務高峰期。配置完畢後,到了觸發的時刻離線平臺就會自動以專案為單位提交一個Spark任務來統計專案表資訊。

在數棧沒有實現CBO支援之前,Spark SQL的優化只能通過調整Spark本身的引數實現。這種調優方式很高的准入門檻,需要使用者比較熟悉Spark的原理。數棧CBO的引入大大降低了使用者的學習門檻,使用者只需要在Spark Conf中開啟

CBO-spark.sql.cbo.enabled=true

然後在對應專案中配置好表資訊統計就可以做到SQL優化了。

四、未來展望

在CBO優化方面持續投入研究後,Spark SQL CBO整體相比較RBO而言已經有了很大的效能提升。但這並不說明整個作業系統就沒有優化的空間了,已經拿到的進步只會鼓舞我們繼續進行更深層次的探索,努力往前再邁一步。

完成對CBO的初步支援探索後,數棧把目光看向了Spark 3.0 版本引入的新特性——AQE(Adaptive Query Execution)。

AQE是動態CBO的優化方式,是在CBO基礎上對SQL優化技術又一次的效能提升。如前文所說,CBO目前的計算對前置的原始表資訊統計是仍有依賴的,而且資訊統計過時的情況會給CBO帶來不小的影響。

如果在執行時動態的優化 SQL 執行計劃,就不再需要像CBO那樣需要提前做表資訊統計。數棧正在針對這一個新特性進行,相信不久的將來我們就能引入AQE,讓數棧在易用性高效能方面更上一層樓。希望小夥伴們保持關注,數棧願和大家一起成長。

原文來源:VX公眾號“數棧研習社” 袋鼠雲開源框架釘釘技術交流群(30537511),歡迎對大資料開源專案有興趣的同學加入交流最新技術資訊,開源專案庫地址:https://github.com/DTStack

「其他文章」