Apache Kyuubi 在愛奇藝的時間:加速 Hive SQL 遷移 Spark

語言: CN / TW / HK

Hive 作為愛奇藝數倉的基礎,Hive SQL 是愛奇藝大資料平臺目前主要的數處理工具,各個業務積累大量的 Hive ETL 任務。Spark 相對於 MapReduce 有著更為靈活的的計算模型,這使得 Spark 相對於 Hive (on MapReduce) 有更好的效能。

經過測試對比,我們發現遷移 Hive SQL 到 Spark 將會帶來很大的效能提升和資源節省。

Apache Kyuubi (Incubating) 專案提供一個分散式多租戶的 Spark Thrift Server,相對於 Spark 原生的 Spark Thrift Server 有更好的架構優勢和更多優秀的特性,具體對比可參考:Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS)

Hive SQL 遷移 Spark

1.1 雙跑對比

大資料平臺中已有大量穩定執行的 Hive SQL 任務,為了在遷移的過程中提高使用者遷移意願,降低與使用者的溝通成本,我們需要保證遷移後 Spark SQL 的穩定性以及資料準確性,並儘量減少使用者操作。

我們在大資料平臺中新增了 Hive SQL 遷移服務,提供一鍵批量雙跑測試和一鍵遷移功能。在遷移前, 我們會對業務的 Hive SQL 任務進行批量雙跑測試。改寫業務 SQL 將輸出替換成臨時庫表,並分別通過 Hive SQL 和 Spark SQL 執行。

執行完成後,對 Hive SQL 和 Spark SQL 的執行結果進行校驗,確保遷移後資料一致。對每行資料進行 concat 後計算 crc32 並求和得到 checksum 值,通過對比兩個結果表的 checksum 值和 count 數,來確定資料是否一致。

select sum(cast(CRC32(concat(*)) as decimal(19,0))) as checksum, count(*) as count from mock_db.mock_table

通過雙跑測試,我們能夠提前發現 Hive SQL 與 Spark SQL 的一些相容性問題,以及配置不合理導致任 務失敗的問題。及時優化平臺配置,指導使用者優化 SQL 併為任務提供合適的配置。

1.2 相容性適配

Hive SQL 遷移到 Spark 的過程中,我們遇到了一些語法相容性的問題,例如:

  • Spark 刪除不存在分割槽報錯,而 Hive 中允許刪除不存在分割槽
  • Spark 執行 set mapreduce.job.reduces=-1 報錯
  • 數字型別與字串比較,結果與 Hive 不一致
  • .....

為了加快遷移的進度,減少使用者參與,我們需要相容部分 Hive 語法。

在 Kyuubi 中提供 kyuubi-extension-spark 模組,維護 Spark SQL的 Extensions,用於優化Spark SQL的執行計劃,社群已經提供了很多優化,詳細文件:Auxiliary SQL extension for Spark SQL 。我們基於此模組,為不相容的語法修改其執行計劃,使得與 Hive 保持一致的語義。

例如,在 Hive 中由於預設的 hive.exec.drop.ignorenonexistent=true 配置,在刪除不存在的表或分割槽時不會報錯,但是遷移到 Spark 執行後,出現大量刪除不存在分割槽的錯誤。我們在Kyuubi中實現了 DropIgnoreNonexistent 優化,改寫 AlterTableDropPartitionCommand/DropTableCommand 等命令的執行計劃,將 ifExists 屬性設定為 true ,從而在刪除不存在分割槽時不報錯。

Kyuubi提供了只解釋SQL執行計劃的執行模式,通過 kyuubi.operation.plan.only.mode 配置,可以以 PARSE/ANALYZE/OPTIMIZE/PHYSICAL/EXECUTION 的一些模式解釋 SQL 的執行計劃而不執行它,這樣我們可以快速掃描出有語法相容問題的一些 SQL。

1.3 小檔案優化 

Hive SQL 遷移 Spark 執行不可避免地會出現小檔案的問題。我們使用 Kyuubi 的 Spark Extensions 結合 Spark AQE 可以有效地控制小檔案問題。

Kyuubi Spark Extensions 中提供了 RepartitionBeforeWrite 的優化,在寫入前插入 repartition 操作,再結合 AQE 合併小分割槽,從而實現控制小檔案。

對於寫入動態分割槽的情況,Kyuubi 中 Repartition 操作指定了動態分割槽欄位,這樣可以減小動態分割槽導致的小檔案。如果動態分割槽資料分佈不均勻,可能導致部分分割槽資料傾斜,所以對於動態分割槽寫入 Kyuubi 在 Repartition 時,同時加入了一個隨機數字段來避免資料傾斜。不過這樣由於擴充套件了分割槽欄位 的基數,也可能會導致小檔案問題。

在 spark-defaults.conf 中新增相關的配置:

spark.sql.extensions org.apache.kyuubi.sql.xyuubiSparksQ1Extension


# Kyuubi 
# spark.sql.optimizer.insertmepartitionneforewrite.enabled true 

# AQE 
spark.sql.adaptive.enabled true 
spark.sql.adaptive.advisoryPartitionSizexnaytes 1024m 
spark. sql .adaptive. coal esceParti ti on s .mi nmarti ti onmum 1 

對於 Spark3.2,Kyuubi 通過在寫入前插入 Rebalance 運算元,更好地解決小檔案的問題。

Kyuubi 服務增強

2.1 標籤化配置

對於不同使用者和業務的 SQL,由於資料量和業務處理邏輯存在差異,可能需要對不同的任務進行單獨的 配置優化。我們在資料開發平臺上提供了使用者配置引數的能力,允許使用者對 SQL 任務新增配置。

對於同一使用者或者業務會存在一些具有相同的特效的一些 SQL,我們提供標籤化配置,定義一些標籤繫結特有的配置,並在任務提交時帶上相關的標籤。

例如,我們定義的部分標籤如下:

  • Adhoc:用於即席查詢平臺任務,資料量小,要求快速響應。配置 USER 共享級別的引擎,較大 Driver 記憶體,以及 Spark 任務搶佔策略等。
  • Batch:用於資料開發平臺任務,定時排程執行,穩定要求較高。配置 CONNECTION 級別獨立引 擎,使得任務完全資源隔離,並新增小檔案、AQE 等優化配置。
  • User/Business/Custom:允許定義使用者、業務或其他自定義標籤,繫結特有的一些配置,如:佇列、資源、相容性適配相關配置等,降低使用者使用門檻。

Kyuubi 中已經支援了對使用者新增預設配置,例如在配置檔案中新增 \_\_\_bob\_\_\_.spark.executor.memory=8g 配置,為 bob 使用者的任務預設指定 8G 的 executor 記憶體,具體使用參考:User Default

同時,在Kyuubi 的 kyuubi-server-plugin 中,提供了 SessionConfAdvisor 介面,用於為 Session 注入配置。可以將標籤配置儲存在資料庫中,並實現 SessionConfAdvisor 介面,根據 Session 配置 的標籤從資料庫中獲取對應的配置。

2.2 SQL 審計

Kyuubi 服務中定義了一些事件,可用於 SQL 審計操作。

Kyuubi Server 中定義瞭如下事件:

  • KyuubiServerInfoEvent:Kyuubi Server 啟動、停止事件資訊
  • KyuubiSessionEvent:Session 開啟、關閉事件,以及連線相關資訊
  • KyuubiOperationEvent:Kyuubi Operation 執行事件,包括了 SQL 執行相關資訊

目前 Kyuubi 僅實現了 JSON 寫入本地檔案的方式採集事件,相關配置:

將事件寫入 JSON 檔案後,可以藉助日誌收集外掛採集到 Kakfa 中並落地到 ElasticSearch 中,方便後續 對 SQL 執行事件進行審計分析。

我們在 Kyuubi Server 中通過實現自定義的 EventHandler ,處理 Kyuubi Server 的事件,直接將事件寫入到 ElasticSearch 中。分析失敗 SQL 執行事件的錯誤資訊,完善 Spark SQL 運維知識庫,並新增正則匹配分析規則,匹配執行事件的錯誤資訊給出對應的解決方案,方便使用者自行排障。

文中文件可參考: