Apache Kyuubi on Spark 在CDH上的深度實踐

語言: CN / TW / HK

Kyuubi 是網易數帆主導開源的大資料專案,於2021年6月全票通過進入世界頂級開源基金會 Apache Software Foundation 孵化器。Kyuubi 的一個典型使用場景,是替換 HiveServer2,輕鬆獲得 10~100 倍效能提升。

CDH 最後一個免費版 6.3.2 釋出一年有餘,離線計算核心元件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。

隨著 Spark 3.0 的重磅釋出,大資料系統在效能方面又迎來了一次飛躍,本文將分享把 Spark 3 整合到 CDH 6.3.1(未開啟 Kerberos) 的過程,並使用 Kyuubi 替換 HiveServer2,實現 OLAP、ETL 等場景下從 HiveQL 到 SparkSQL 的無縫遷移,享受 10x-100x 的效能紅利。

CDH 缺陷修復

[ORC-125] 修復 Hive 不能讀取高版本 ORC 寫入的資料

當使用 Hive 讀取由 Presto 或者 Spark 等寫入的 ORC 檔案時,會出現以下錯誤。

ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6

該問題在 ORC 上游被修復 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE。

ORC 最早是 Hive 的一個子專案,在 CDH 6 整合的 Hive 2.1 這個版本里,ORC 還沒有分離出去,所以這個問題要在 Hive 原始碼裡修復。

我做了一個打包好的修復版本,GitHub 傳送門 ,下載更換 /opt/cloudera/parcels/CDH/lib/hive/lib 路徑下的 hive-exec-2.1.1-cdh6.3.1.jar, hive-orc-2.1.1-cdh6.3.1.jar即可。(至少需要更換 Hadoop Client、HiveServer2 節點,如果你不知道我在說什麼,就把所有節點都換掉)

Spark 3.1

[SPARK-33212] Spark 使用 Hadoop Shaded Client

Hadoop 3.0 提供了 Shaded Client,用於下游專案規避依賴衝突 [HADOOP-11656] Classpath isolation for downstream clients。

Spark 3.2 在 hadoop-3.2 profile 中切換到了 Hadoop Shaded Client [SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile。

該更改不是必須的,但個人建議從 Spark 主線將該補丁移植到 branch-3.1 使用,以規避潛在的依賴衝突。

[CDH-71907] Spark HiveShim 適配 CDH Hive

Spark 通過反射和隔離的類載入器來實現對多版本 Hive Metastore 的支援,詳情參考 Interacting with Different Versions of Hive Metastore - Spark Documentation

CDH 6 使用修改過的 Hive 2.1.1 版本,其方法簽名與 Apache 版本有所不同,故 Spark HiveShim 反射呼叫會出現找不到方法簽名,需要手動將 CDH-71907 補丁打到 branch-3.1。

Spark External Shuffle Service 協議相容

Spark shuffle 時,mapper 會將資料寫入到本地磁碟而非 HDFS,引入 ESS 後,會將檔案資訊註冊到 ESS 中,將 mapper 與 reducer 解耦。CDH 中,預設會啟用 Spark Yarn External Shuffle Service,作為 YARN AUX Service 在所有 Yarn Node 上啟動。

Spark 3 修改了 shuffle 通訊協議,在與 CDH 2.4 版本的 ESS 互動時,需要設定 spark.shuffle.useOldFetchProtocol=true,否則可能報如下錯誤。[SPARK-29435] Spark 3 doesn't work with older shuffle service

IllegalArgumentException: Unexpected message type: <number>.

Spark 版本遷移指南

如果你有現存的基於 CDH Spark 2.4 的 Spark SQL/Job,在將其遷移至 Spark 3 版本前,請參閱完整的官方遷移指南。

Migration Guide: Spark Core - Spark Documentation 

Migration Guide: SQL, Datasets and DataFrame - Spark Documentation 

Spark 部署

官方文件 Running Spark on YARN - Documentation 中提到

To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME

https://spark.apache.org/docs/latest/running-on-yarn.html#preparations

因此,無需在 CDH 所有節點上部署 Spark 3,只需在 Hadoop Client 節點上部署 Spark 3 即可。

如果你對叢集許可權管理沒有十分嚴格的要求,請使用 hive 使用者以避免許可權問題。

我基於 Spark 3.1.2 製作了一個適配 CDH 6 的版本, GitHub 傳送門 ,下載解壓至 /opt,並軟鏈至 /opt/spark3

[hive@cdh-kyuubi]$ ls -l /opt | grep sparklrwxrwxrwx  1 root         root           39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2drwxr-xr-x 13 hive         hive         4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2

配置 Hadoop、Hive

CDH 會將配置檔案自動分發到所有節點 /etc 目錄下,建立軟鏈即可。

ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/

配置 Spark 環境變數 /opt/spark3/conf/spark-env.sh

#!/usr/bin/env bashexport HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/confexport YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf

配置 Spark 預設引數 /opt/spark3/conf/spark-defaults.conf

請參考 Configuration - Spark Documentation 根據叢集環境實際情況進行微調

spark.authenticate=falsespark.io.encryption.enabled=falsespark.network.crypto.enabled=falsespark.eventLog.enabled=truespark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistoryspark.driver.log.dfsDir=/user/spark/driverLogsspark.driver.log.persistToDfs.enabled=truespark.files.overwrite=truespark.files.useFetchCache=falsespark.serializer=org.apache.spark.serializer.KryoSerializerspark.shuffle.service.enabled=truespark.shuffle.service.port=7337spark.shuffle.useOldFetchProtocol=truespark.ui.enabled=truespark.ui.killEnabled=truespark.yarn.historyServer.address=http://cdh-master2:18088spark.yarn.historyServer.allowTracking=truespark.master=yarnspark.submit.deployMode=clusterspark.driver.memory=2Gspark.executor.cores=6spark.executor.memory=8Gspark.executor.memoryOverhead=2Gspark.memory.offHeap.enabled=truespark.memory.offHeap.size=2Gspark.dynamicAllocation.enabled=truespark.dynamicAllocation.executorIdleTimeout=60spark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.schedulerBacklogTimeout=1spark.sql.cbo.enabled=truespark.sql.cbo.starSchemaDetection=truespark.sql.datetime.java8API.enabled=falsespark.sql.sources.partitionOverwriteMode=dynamicspark.sql.hive.convertMetastoreParquet=falsespark.sql.hive.convertMetastoreParquet.mergeSchema=falsespark.sql.hive.metastore.version=2.1.1spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*spark.sql.orc.mergeSchema=truespark.sql.parquet.mergeSchema=truespark.sql.parquet.writeLegacyFormat=truespark.sql.adaptive.enabled=truespark.sql.adaptive.forceApply=falsespark.sql.adaptive.logLevel=infospark.sql.adaptive.advisoryPartitionSizeInBytes=256mspark.sql.adaptive.coalescePartitions.enabled=truespark.sql.adaptive.coalescePartitions.minPartitionNum=1spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024spark.sql.adaptive.fetchShuffleBlocksInBatch=truespark.sql.adaptive.localShuffleReader.enabled=truespark.sql.adaptive.skewJoin.enabled=truespark.sql.adaptive.skewJoin.skewedPartitionFactor=5spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128mspark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2spark.sql.autoBroadcastJoinThreshold=-1

驗證 spark-shell 工作正常

[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shellSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.Spark context Web UI available at http://cdh-master1:4040Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).Spark session available as 'spark'.Welcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2-cdh6      /_/Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)Type in expressions to have them evaluated.Type :help for more information.scala> spark.sql("show databases").show+----------------+|       namespace|+----------------+|            test|+----------------+scala>

至此,Spark 3 on CDH 6 已經部署完成,可以像 CDH 自帶的 Spark 2.4 一樣,正常的使用 spark-submit、spark-shell,但依舊不支援 spark-sql,spark-thriftserver。

相比略顯雞肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的選擇,因此我故意移除了這兩個功能。

注意:我提供的構建版,沒有啟用 spark-thriftserver 模組,為正常使用 Kyuubi,請下載hive-service-rpc-3.1.2.jar 新增到 /opt/spark3/jars 路徑。

Kyuubi —— 解鎖 Spark SQL 更多場景

簡言之,Apache Kyuubi (Incubating) 之於 Spark,類似 HiveServer2 之於 Hive。

Kyuubi 通過將 Spark 暴露一個與 HiveServer2 完全相容的 Thrift API,可以相容現有的 Hive 生態,如 beeline,hive-jdbc-driver,HUE,Superset 等。

可以通過以下文件來了解 Kyuubi 的架構,Kyuubi 與 HiveServer2 和 Spark Thrift Server 的異同。

  • Welcome to Kyuubi’s documentation 

  • Kyuubi Architecture — Kyuubi documentation 

  • Kyuubi v.s. HiveServer2 — Kyuubi documentation 

  • Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS) — Kyuubi documentation ⑦

Kyuubi 部署

Kyuubi 無需任何修改即可適配 CDH 6,下面給出關鍵步驟,詳情可以參考 Deploy Kyuubi engines on Yarn — Kyuubi documentation ⑧

同樣的,如果你對叢集許可權管理沒有十分嚴格的要求,請使用 hive 使用者以避免許可權問題。

解壓部署

下載 kyuubi-1.3.0-incubating-bin.tgz 解壓至 /opt,並建立軟鏈到 /opt/kyuubi。

[hive@cdh-external opt]$ ls -l /opt | grep kyuubilrwxrwxrwx  1 root         root           32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bindrwxrwxr-x 13 hive         hive         4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin

修改配置

按需修改/opt/kyuubi/conf/kyuubi-env.sh

#!/usr/bin/env bashexport JAVA_HOME=/usr/java/defaultexport SPARK_HOME=/opt/spark3export SPARK_CONF_DIR=${SPARK_HOME}/confexport HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/confexport KYUUBI_PID_DIR=/data/log/service/kyuubi/pidexport KYUUBI_LOG_DIR=/data/log/service/kyuubi/logsexport KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/workexport KYUUBI_MAX_LOG_FILES=10

參考Kyuubi Configurations System — Kyuubi documentation 

按需修改/opt/kyuubi/conf/kyuubi-defaults.conf

kyuubi.authentication=NONEkyuubi.engine.share.level=USERkyuubi.frontend.bind.host=0.0.0.0kyuubi.frontend.bind.port=10009kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181kyuubi.ha.zookeeper.namespace=kyuubikyuubi.session.engine.idle.timeout=PT10Hspark.master=yarnspark.submit.deployMode=clusterspark.dynamicAllocation.enabled=truespark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.maxExecutors=20spark.dynamicAllocation.executorIdleTimeout=60

注意:kyuubi-defaults.conf 中的 spark 配置優先順序高於 spark-defaults.conf

效能調優

How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation  

How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation  

啟動

使用/opt/kyuubi/bin/kyuubi 在前臺或後臺啟動 Kyuubi Server。

[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --helpUsage: bin/kyuubi command  commands:    start        - Run a Kyuubi server as a daemon    run          - Run a Kyuubi server in the foreground    stop         - Stop the Kyuubi daemon    status       - Show status of the Kyuubi daemon    -h | --help  - Show this help message

Beeline 連線

使用預設引數連線 Kyuubi

beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata

使用自定義引數連線 Kyuubi

beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch

詳細配置參考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation 

HUE 連線

簡單說,在 Cloudera Manager 中修改 HUE 配置項 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中開啟 Spark SQL 引擎。

[desktop] app_blacklist=zookeeper,hbase,impala,search,sqoop,security use_new_editor=true[[interpreters]][[[sparksql]]]  name=Spark SQL  interface=hiveserver2[[[hive]]]  name=Hive  interface=hiveserver2# other interpreters  ...[spark]sql_server_host=kyuubisql_server_port=10009

詳細配置參考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation 

Kyuubi engine 共享級別與應用場景

Kyuubi Spark engine 即一個 Spark driver,通過控制 engine 的共享策略,可以在隔離性和資源利用率上取得平衡。Kyuubi 提供 3 種 engine 共享級別,分別為 CONNECTION,USER(預設),SERVER。

下面的討論均假設使用 YARN Cluster 模式啟動 Kyuubi Spark engine。

我們首先補充一些時間開銷和 Spark 操作執行的資訊。

Kyuubi Spark engine 在冷啟動時,會由 Kyuubi Server 通過spark-submit 命令向 YARN 提交一個 Spark App,即 engine,該過程從提交到 Spark driver 啟動約需要 5-6s,然後 engine 將自己註冊到 Zookeeper,Kyuubi Server 監聽 Zookeeper 發現 engine 並建立連線,該過程約 1-2s,如此算來,在 YARN 資源空閒時,整個 engine 冷啟動時間約 6-8s;Client 連線到存在的 engine,通常耗時 1s 內。

如果啟動了 Spark 的 executor 動態伸縮特性,真正執行 SQL 任務時,如果資源有富餘,會動態建立 executor,每個 executor 建立耗時約為 2-3s。

元資料、DDL 等操作,如獲取 database 列表,CREATE TABLE語句等,會在 driver 上執行;計算任務如 JOIN、COUNT() 等,會由 driver 生成執行計劃,在 executor 上執行。

在我們的生產環境中,大概有如下三種使用場景:

(1)使用 HUE 進行 ad-hoc 查詢 

該場景中,會有多個使用者進行查詢,一般會執行相對較大的查詢任務,使用者對連線建立時間以及元資料載入時間較為敏感,但對查詢結果響應時間有一定的容忍性。在這種場景中,使用預設的 USER 共享級別,每個使用者只使用一個 Spark engine,配合使用 Spark 的動態伸縮特性,動態的建立和銷燬 executor,在保證使用者之間隔離的基礎上,降低啟動時間和資源佔用。建議的關鍵配置如下:

kyuubi.engine.share.level=USERkyuubi.session.engine.idle.timeout=PT1Hspark.dynamicAllocation.enabled=truespark.dynamicAllocation.minExecutors=0spark.dynamicAllocation.maxExecutors=30spark.dynamicAllocation.executorIdleTimeout=120

(2)使用 Beeline 執行批任務

SQL 該場景會使用統一的 batch 賬號提交 SQL 任務,對響應時間敏感度低,但對穩定性要求非常要,並且應儘可能的最大化利用叢集資源。推薦在該場景下將共享級別調整為 CONNECTION,這樣每個 SQL 執行將會使用獨立的 Spark driver,並且 SQL 執行完畢後 Spark driver 立即退出,保障離線任務互不干擾,並且資源及時釋放。建議的關鍵配置如下:

kyuubi.engine.share.level=CONNECTIONspark.dynamicAllocation.enabled=true# 根據任務具體資源消耗估算,從 workflow 整體上提升叢集資源利用率spark.dynamicAllocation.minExecutors=5spark.dynamicAllocation.maxExecutors=30

(3)使用 Superset 進行多資料來源聯邦查詢

該場景中,只有一個 service 賬號,會定時同時重新整理大量的圖表,對連線建立時間、查詢結果響應時間、併發度都有較高的要求。該場景中,driver 和 executor 的啟動時間都是不容忽視的,因此在 ad-hoc 查詢配置的基礎上,應延長 driver、executor 的閒置等待時間,並且設定最小的 executor 保活數量,保證時刻有 executor 常駐,能快速響應較小的查詢。建議的關鍵配置如下:

kyuubi.engine.share.level=USERkyuubi.session.engine.idle.timeout=PT10Hspark.dynamicAllocation.enabled=truespark.dynamicAllocation.minExecutors=6spark.dynamicAllocation.maxExecutors=10spark.dynamicAllocation.executorIdleTimeout=120

我們可以啟動三個 Kyuubi Server(單機或叢集)來分別應對如上的三種場景,但也可以僅啟動一個 Kyuubi Server(單機或叢集)來滿足不同的場景。

一種建議的實踐方式是 Kyuubi Server 配置使用預設的 USER 共享級別,這樣客戶端連線時,會使用預設配置;當 ETL 跑批場景時,可以通過 beeline 引數將本次連線共享級別調整為 CONNECTION;類似的,在 Superset 連線中配置獨立引數。

結語

現在去跑一些 SQL,體驗 Spark SQL 帶來的效能飛躍吧!

參考連結:

  1. https://github.com/pan3793/cdh-hive/releases/tag/v2.1.1-cdh6.3.1-fix
  2. https://spark.apache.org/docs/latest/core-migration-guide.html
  3. https://spark.apache.org/docs/latest/sql-migration-guide.html
  4. https://kyuubi.readthedocs.io/en/latest/index.html
  5. https://kyuubi.apache.org/docs/latest/overview/architecture.html
  6. https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_hive.html
  7. https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_thriftserver.html
  8. https://kyuubi.apache.org/docs/latest/deployment/on_yarn.html
  9. https://kyuubi.apache.org/docs/latest/deployment/settings.html
  10. https://kyuubi.apache.org/docs/latest/deployment/spark/dynamic_allocation.html
  11. https://kyuubi.apache.org/docs/latest/deployment/spark/aqe.html
  12. https://kyuubi.apache.org/docs/latest/client/hive_jdbc.html
  13. https://kyuubi.apache.org/docs/latest/quick_start/quick_start_with_hue.html#for-cdh-6-x-users

作者簡介

潘成,來自 Apache Kyuubi 社群

相關閱讀