Kyuubi 實踐 | 如何優化 Spark 小檔案,Kyuubi 一步搞定!

語言: CN / TW / HK

Hive 表中太多的小檔案會影響資料的查詢效能和效率,同時加大了 HDFS NameNode 的壓力。Hive (on MapReduce) 一般可以簡單的通過一些引數來控制小檔案,而 Spark 中並沒有提供小檔案合併的功能。下面我們來簡單瞭解一下 Spark 小檔案問題,以及如何處理小檔案。

0 1

Spark 小檔案問題

  1.1 環境 

Kyuubi 版本:1.6.0-SNAPSHOT

Spark 版本:3.1.3、3.2.0 

  1.2   TPCDS 資料集 

Kyuubi 中提供了一個 TPCDS Spark Connector,可以通過配置 Catalog 的方式,在讀取時自動生成 TPCDS 資料。 

只需要將 kyuubi-spark-connector-tpcds_2.12-1.6.0-SNAPSHOT.jar 包放入 Spark jars 目錄中,並配置:

spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;

這樣我們就可以直接讀取 TPCDS 資料集:

use tpcds; 
show databases; 
use sf3000; 
show tables; 
select * from sf300.catalog_returns limit 10;

  1.3  小檔案產生

首先我們在 Hive 中建立一個 sample.catalog_returns 表,用於寫入生成的 TPCDS catalog_returns 資料,並新增一個 hash 欄位作為分割槽。

我們先關閉 Kyuubi 的優化,讀取 catalog_returns 資料並寫入 Hive:

Spark SQL 最終產生的檔案數最多可能是最後一個寫入的 Stage 的 Task 數乘以動態分割槽的數量 我們 可以看到由於讀取輸入表的 Task 數是 44 個,所以最終產生了 44 個檔案,每個檔案大小約 69 M。  

  1.4   改變分割槽數(Repartition)  

由於寫入的檔案數跟最終寫入 Stage 的 Task 資料有關,那麼我們可以通過新增一個 Repartition 操作, 來減少最終寫入的 task 數,從而控制小檔案:

新增 REPARTITION(10) 後,會在讀取後做一個 Repartition 操作,將 partition 數變成 10,所以最終 寫入的檔案數變成 10 個。 

  1.5   Spark AQE 自動合併小分割槽

Spark 3.0 以後引入了自適應查詢優化(Adaptive Query Execution, AQE),可以自動合併較小的分割槽。

開啟 AQE,並通過新增 distribute by cast(rand() * 100 as int) 觸發 Shuffle 操作:

預設 Shuffle 分割槽數 spark.sql.shuffle.partitions=200 ,如果不開啟 AQE 會產生 200 個小檔案, 開啟 AQE 後,會自動合併小分割槽,根據 spark.sql.adaptive.advisoryPartitionSizeInBytes=512M 配置合併較小的分割槽,最終產生 12 個檔案。

0 2

Kyuubi 小檔案優化分析

Apache Kyuubi (Incubating) 作為增強版的 Spark Thrift Server 服務,可通過 Spark SQL 進行大規模的 資料處理分析。Kyuubi 通過 Spark SQL Extensions 實現了很多的 Spark 優化,其中包括了 RepartitionBeforeWrite 的優化,再結合 Spark AQE 可以自動優化小檔案問題,下面我們具體分析 一下 Kyuubi 如何實現小檔案優化。

  2.1   Kyuubi 如何優化小檔案

 Kyuubi 提供了在寫入前加上 Repartition 操作的優化,我們只需要將 kyuubi-extension-spark-3- 1_2.12-1.6.0-SNAPSHOT.jar 放入 Spark jars 目錄中,並配置 spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension 。相關配置:

通過 spark.sql.optimizer.insertRepartitionNum 引數可以配置最終插入 Repartition 的分割槽數, 當不開啟 AQE,預設為 spark.sql.shuffle.partitions 的值。 需要注意,當我們設定此配置會導致 AQE 失效,所以開啟 AQE 不建議設定此值 。 

對於動態分割槽寫入,會根據動態分割槽欄位進行 Repartition,並新增一個隨機數來避免產生資料傾斜, spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 用來配置隨機數的範圍,不 過新增隨機數後,由於加大了動態分割槽的基數,還是可能會導致小檔案。這個操作類似在 SQL 中新增 distribute by DYNAMIC_PARTITION_COLUMN, cast(rand() * 100 as int)

  2.2   靜態分割槽寫入

開啟 Kyuubi 優化和 AQE,測試靜態分割槽寫入:

可以看到 AQE 生效了,很好地控制了小檔案,產生了 11 個檔案,檔案大小 314.5 M 左右。

  2.3   動態分割槽寫入

我們測試一下動態分割槽寫入的情況,先關閉 Kyuubi 優化,並生成 10 個 hash 分割槽:

產生了 44 × 10 = 440 個檔案,檔案大小 8 M 左右。

開啟 Kyuubi 優化和 AQE:

產生了 12 × 10 = 120 個檔案,檔案大小 30 M 左右,可以看到小檔案有所改善,不過仍然不夠理想。

此案例中 hash 分割槽由 rand 函式產生,分佈比較均勻,所以我們將 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 設定成 0 ,重新執行,同時將動態分割槽數設定為 5

由於動態分割槽數只有 5 個,所以實際上只有 5 個 Task 有資料寫入,每個 Task 對應一個分割槽,導致最終每個分割槽只有一個較大的大檔案。

通過上面的分析可以看到,對於動態分割槽寫入,Repartition 的優化可以緩解小檔案,配置 spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum=100 解決了資料傾斜問題, 不過同時還是可能會有小檔案。 

  2.4   Rebalance 優化  

Spark 3.2+ 引入了 Rebalance 操作,藉助於 Spark AQE 來平衡分割槽,進行小分割槽合併和傾斜分割槽拆 分,避免分割槽資料過大或過小,能夠很好地處理小檔案問題。 

Kyuubi 對於 Spark 3.2+ 的優化,是在寫入前插入 Rebalance 操作,對於動態分割槽,則指定動態分割槽列 進行 Rebalance 操作。不再需要 spark.sql.optimizer.insertRepartitionNum spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 配置。

測試靜態分割槽寫入 ,使用 Spark 3.2.0 開啟 Kyuubi 優化和 AQE:

Repartition 操作自動合併了小分割槽,產生了 11 個檔案,檔案大小 334.6 M 左右,解決了小檔案的問題。

測試動態分割槽寫入 ,使用 Spark 3.2.0 開啟 Kyuubi 優化和 AQE,生成 5 個動態分割槽:

Repartition 操作自動拆分較大分割槽,產生了 2 × 5 = 10 個檔案,檔案大小 311 M 左右,很好地解決傾斜問題。

0 3

總 結

從上面的分析可以看到,對於 Spark 3.2+,Kyuubi 結合 Rebalance 能夠很好地解決小檔案問題,對於 Spark 3.1,Kyuubi 也能自動優化小檔案,不過動態分割槽寫入的情況還是可能存在問題。

相關的配置總結:

更多 AQE 配置可以參考: How To Use Spark Adaptive Query Execution (AQE) in Kyuubi

END

如果你也想成為 Committer

:point_down::point_down: :point_down::point_down: :point_down::point_down:

·Become A Committer of Apache Kyuubi

看到這裡記得多多點贊、評論、收藏

還可以把 Kyuubi 分享給更多朋友~