Apache Kyuubi 助力 CDH 解鎖 Spark SQL

語言: CN / TW / HK

Apache Kyuubi(Incubating)(下文簡稱Kyuubi)是⼀個構建在Spark SQL之上的企業級JDBC閘道器,相容HiveServer2通訊協議,提供高可用、多租戶能力。Kyuubi 具有可擴充套件的架構設計,社群正在努力使其能夠支援更多通訊協議(如 RESTful、 MySQL)和計算引擎(如Flink)。

Kyuubi的願景是讓大資料平民化。一個的典型使用場景是替換HiveServer2,幫助企業把HiveQL遷移到Spark SQL,輕鬆獲得10~100倍效能提升(具體提升幅度與SQL和資料有關)。另外最近比較火的兩個技術,LakeHouse和資料湖,都與Spark結合得比較緊密,如果我們能把計算引擎遷移到Spark上,那我們離這兩個技術就很近了。

Kyuubi最早起源於網易,這個專案自誕生起就是開源的。在Kyuubi發展的前兩年,它的使用場景主要在網易內部。自從2020年底進行了一次架構大升級、釋出了Kyuubi 1.0之後,整個Kyuubi社群開始活躍起來了,專案被越來越多的企業採用,然後在今年6月進入了Apache基金會孵化器,並在今年9月釋出了進入孵化器後的第一個版本1.3.0-incubating。

Kyuubi vs SparkThriftServer vs HiveServer2

我們通過Kyuubi和其他SQL on Spark方案的對比,看看用Kyuubi替換 HiveServer2能帶來什麼樣的提升。圖中對Hive Server2標記了Hive on Spark,這是Hive2的一個功能,最早的Hive會把SQL翻譯成MapReduce來執行,Hive on Spark方案其實就是把SQL翻譯成Spark運算元來執行,但是這僅僅是物理運算元的替換,因為複用了Hive的SQL解析邏輯,所以SQL方言還是HiveQL,包括後續SQL的改寫、優化走的都是Hive的優化器。Spark2放棄了Hive on Spark方案,選擇從頭開始做SQL解析、優化,創造了Spark SQL和Catalyst。所以說,Spark Thrift Server只是相容了HiveServer2的Thrift通訊協議,它整個SQL的解析、優化都被重寫了。

圖片

Kyuubi也是用Spark來解析、優化和執行SQL,所以對於使用者來說,客戶端與服務端的通訊協議是一模一樣的,因為大家相容的都是HiveServer2的通訊協議。但是在SQL方言上,Kyuubi和Spark Thrift Server是Spark SQL,HiveServer2是HiveQL。SQL的編譯和優化過程,HiveServer2在本程序上進行,Spark是在Driver端進行。對於STS,Thrift Server和Driver在同一個程序內;對於Kyuubi,Thrift Server和Spark Driver是分離的,它們可以跑在同一臺機器上的不同程序(如YARN Client模式),也可以跑在不同的機器上(如YARN Cluster模式)。

對於執行階段,當一條SQL提交後,HiveServer2會將其翻譯成了一個Spark Application,每一次SQL提交就會生成一個全新的Spark的應用,都會經歷Driver的建立、Executor的建立過程,SQL執行結束後再將其銷燬掉。Spark Thrift Server則是完全相反的方式,一個Spark Thrift Server只持有一個Driver,Driver是常駐的,所有的SQL都會由這一個Driver來編譯、執行。Kyuubi不僅對這兩種方式都支援,還支援了更靈活的Driver共享策略,會在後續詳細介紹。

Kyuubi on Spark與CDH整合

CDH是使用最廣泛的Apache Hadoop發行版之一,其本身集成了Spark,但是禁用了Spark Thrift Server功能和spark-sql命令,使得使用者只能通過spark-shell、spark-submit使用Spark,故而在CDH上使用Spark SQL具有一定的門檻。在CDH上SQL方案用得更多的往往是Hive,比如說我們可以通過Beeline、HUE連線HiveServer2,來進行SQL批任務提交或互動式查詢,同時也可以通過Apache Superset這類BI工具連線到HiveServer2上做資料視覺化展示,背後最終的計算引擎可以是MapReduce,也可以是Spark。

圖片

當我們引入Kyuubi後,圖中左側的這些Client都是不需要更改的,只需要部署Spark3和Kyuubi Server(當前Kyuubi僅支援Spark3),再把Client連線地址改一下,即可完成從HiveQL到Spark SQL的遷移。當然其間可能會碰到HiveQL方言跟Spark SQL方言的差異性問題,可以結合成本選擇修改Spark或者修改SQL來解決。

圖片

Kyuubi中有一個重要的概念叫作引擎共享級別(Engine Share Level),Kyuubi通過該特性提供了更高階的Spark Driver共享策略。Spark Engine與Spark Driver概念是等價的,都對應一個Spark Application。Kyuubi目前提供了三種Engine共享級別,SERVER、USER和CONNECTION,預設是USER模式。

SERVER模式類似Spark Thrift Server,Kyuubi Server只會持有一個Spark Engine,即所有的查詢都會提交到一個Engine上來跑。USER模式即每個使用者會使用獨立的Engine,能做到使用者級別的隔離,這也是更具普適性的一種方式,一方面不希望太浪費資源,每個SQL都起一個Engine,另一方面也希望保持一定的隔離性。CONNECTION模式是指Client每建立一個連線,就建立一個Engine,這種模式和Hive Server2較為類似,但不完全一致。CONNECTION模式比較適合跑批計算,比如ETL任務,往往需要數十分鐘甚至幾個小時,使用者不希望這些任務互相干擾,同時也希望不同的SQL有不同的配置,例如為Driver分配2G記憶體還是8G記憶體。總的來說,CONNECTION模式比較適合跑批任務或者大任務,USER模式比較適合HUE互動式查詢的場景。

大家可能會擔心,我們是不是對Spark做了很厚的一層包裝,限制了很多功能?其實不是的,所有的Spark的configuration在Kyuubi中都是可用的。當一個請求傳送給Kyuubi時,Kyuubi會去找一個合適的Engine跑這個任務,如果找不到它就會通過拼接spark-submit命令來建立一個Engine,所以所有的Spark支援的configuration都可以用。如圖中列舉了YARN Client和YARN Cluster兩種模式,怎麼寫配置,Spark Driver就跑在什麼地方,改成K8s,Driver就跑在Kubernetes裡面,所以Kyuubi對Kubernetes的支援也是水到渠成的。

還有一個使用者常問的問題,是不是USER模式或者CONNECTION模式,每一個場景都需要單獨部署一套Kyuubi Server?不需要。我們可以把常用的配置固化在kyuubi-defaults.conf裡面,當Client連線Kyuubi Server時,可以通過在URL裡面寫一些配置引數來覆蓋預設配置項,比如預設使用的是USER模式,提交批任務時選擇CONNECTION模式。

Kyuubi 引擎隔離級別的實現

Kyuubi多種靈活的隔離級別是怎麼實現的?下圖綠色的元件即Service Discovery Layer,目前通過ZooKeeper實現,但它本質上是一個服務註冊/發現的元件。我們已經發現有開發者把Kyuubi整合到Kubernetes上,用API Server來實現Service Discovery Layer,該功能目前還沒有提交給社群。

圖片

上圖中的User Side即Client側,社群目前策略是完全相容Hive的Client,以更好的複用Hive生態。Hive本身通過ZooKeeper實現了Client側的HA模式,Server啟動以後,會將自身按照一定的規則註冊到ZooKeeper裡面,Beeline或者其他Client連線的時候,將連線地址設定成ZooKeeper叢集的地址,Client即可以發現指定的路徑下所有的Server,隨機選擇其中一個來連線。這是HiveServer2協議的一部分,Kyuubi是完全相容的。

Kyuubi Server與Engine之間也使用了類似的服務註冊與發現機制,Engine在Zookeeper上的註冊路徑會遵守一定的規則,例如在USER隔離級別下,路徑規則是/kyuubi_USER/{username}/{engine_node}。Kyuubi Server接到連線請求,會按照規則從特定路徑下查詢可用的Engine節點。若找到,直接連線該Engine;若找不到,建立一個Engine並等待其啟動完成,Engine啟動完成後,會按照相同的規則在指定路徑下建立一個engine節點,並把自己的(包含連線地址、版本等)填寫到節點裡。在YARN Cluster模式下,Engine會被分配到YARN叢集任意節點啟動,正是通過這種機制,Server才能找到並連線Engine。

對於CONNECTION模式,每一個Engine只會被連線一次,為了實現這個效果,Kyuubi設計瞭如下的路徑規則,/kyuubi_CONNECTION/{username}/{uuid}/{engine_node},通UUID的唯一性來實現隔離。所以Kyuubi實現引擎隔離級別的方式是一個非常靈活的機制,我們目前支援了SERVER、USER和CONNECTION,但通過簡單的擴充套件,它就可以支援更多更靈活的模式。在程式碼主線分支上,社群目前已經實現了額外的兩個共享級別,一個是GROUP,可以讓一組使用者來共享一個Engine;另外一個是Engine Pool,可以讓一個使用者來使用多個Engine以提高併發能力,一個適用的場景是BI圖表展示,例如為Superset配置一個Service Account,對應多個Engine,當幾百個圖表一起重新整理時,可以將計算壓力分攤到不同Engine上。

Kyuubi實踐 | 編譯Spark3.1以適配CDH5並整合Kyuubi

https://mp.weixin.qq.com/s/lgbmM1qNetuPB0-j-TAzkQ

Apache Kyuubi on Spark 在CDH上的深度實踐

https://my.oschina.net/u/4565392/blog/5264848

我們在Kyuubi的官方公眾號提供了兩篇文章,內容包含了將Kyuubi整合到CDH平臺上的具體操作過程,第一篇描述了與CDH5(Hadoop2,啟用Kerberos)的整合,第二篇描述了CDH6(Hadoop3,未啟用Kerberos)的整合。對於整合其他非CDH平臺的Hadoop發行版,也具有一定的參考價值。

圖片

 

Spark 3 特性以及 Kyuubi 帶來的增強

動態資源分配

首先是動態資源分配,Spark本身已經提供了Executor的動態伸縮能力。可以看到,這幾個引數配置在語義上是非常明確的,描述了Executor最少能有多少個,最多能有多少個,最大閒置時長,以此控制Executor的動態建立和釋放。

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120

 

圖片

引入了Kyuubi後,結合剛才提到的Share Level和Engine的建立機制,我們可以實現Driver的動態建立,然後我們還引入了一個引數,engine.idle.timeout,約定Driver閒置了多長時間以後也釋放,這樣就實現Spark Driver的動態建立與釋放。

kyuubi.engine.share.level=CONNECTION|USER|SERVER
kyuubi.session.engine.idle.timeout=PT1H

這裡要注意,因為CONNECTION場景比較特殊,Driver是不會被複用的,所以對於CONNECTION模式,engine.idle.timeout是沒有意義的,只要連線斷開Driver就會立刻退出。

Adaptive Query Execution

Adaptive Query Execution(AQE)是Spark 3帶來的一個重要特性,簡而言之就是允許SQL邊執行邊優化。就拿Join作為例子來說,等值的Inner Join,大表對大表做Sort Merge Join,大表對小表做Broadcast Join,但大小表的判斷髮生在SQL編譯優化階段,也就是在SQL執行之前。

我們考慮這樣一個場景,兩個大表和大表Join,加了一個過濾條件,然後我們發現跑完過濾條件之後,它就變成了一個大表和一個小表Join了,可以滿足Broadcast Join的條件,但因為執行計劃是在沒跑SQL之前生成的,所以它還是一個Sort Merge Join,這就會引入一個不必要的Shuffle。這就是AQE優化的切入點,可以讓SQL先跑一部分,然後回頭再跑優化器,看看能不能滿足一些優化規則來讓SQL執行得更加高效。

圖片

另一個典型的場景是資料傾斜。我們講大資料不怕資料量大,但是怕資料傾斜,因為在理想情況下,效能是可以通過硬體的水平擴充套件來實現線性的提升,但是一旦有了資料傾斜,可能會導致災難。還是以Join為例,我們假定是一個等值的Inner Join,有個別的partition特別大,這種場景我們會有一些需要修改SQL的解決方案,比如把這些大的挑出來做單獨處理,最後把結果Union在一起;或者針對特定的Join Key加鹽,比如加一個數字字尾,將其打散,但是同時還要保持語義不變,就是說右表對應的資料要做拷貝來實現等價的Join語義,最後再把新增的數字字尾去掉。

可以發現,這樣的一個手工處理方案也是有一定規律可循的,在Spark中將該過程自動化了,所以在Spark3裡面開啟了AQE後,就能自動幫助解決這類Join的資料傾斜問題。如果我們的ETL任務裡面有很多大表Join,同時資料質量比較差,有嚴重的資料傾斜,也沒有精力去做逐條SQL的優化,這樣情況從HiveQL遷到Spark SQL上面可以帶來非常顯著的效能提升,10到100倍一點也不誇張!

Extension

Spark通過Extension API提供了擴充套件能力,Kyuubi提供了KyuubiSparkSQLExtension,利用Extension API在原有的優化器上做了一些增強。這裡列舉了其中一部分增強規則,其中有Z-order功能,通過自定義優化器規則支援了資料寫入時Z-order優化排序的的功能,並且通過擴充套件SQL語法實現了Z-order語法的支援;也有一些規則豐富了監控統計資訊;還有一些規則限制了查詢分割槽掃描數量,和結果返回數量等。

圖片

以RepartitionBeforeWriteHive為例做下簡單介紹,這條規則用於解決Hive的小檔案寫入問題。對於Hive動態分割槽寫入場景,如果執行計劃最後一個stage,在寫入Hive表之前,DataFrame的Partition分佈與Hive表的Partition分佈不一致,在資料寫入時,每一個task就會將持有的資料寫到很多Hive表分割槽裡面,就會生成大量的小檔案。當我們開啟RepartitionBeforeWriteHive規則以後,它會在寫入Hive表之前依照Hive表的分割槽插入一個Repartition運算元,保證相同Hive表分割槽的資料被一個task持有,避免寫入產生大量的小檔案。

Kyuubi為所有規則都提供了開關,如果你只希望啟用其中部分規則,參考配置文件將其開啟即可。KyuubiSparkSQLExtension提供一個Jar,使用時,可以把Jar拷貝到${SPAKR_HOME}/jars下面,或命令引數--jar新增Jar,然後開啟KyuubiSparkSQLExtension,並根據配置項來選擇開啟特定功能。

Spark on ClickHouse

Data Source V2 API也是Spark3引入的一個重要特性。Data Source V2最早在Spark 2.3提出,在Spark 3.0被重新設計。下圖用多種顏色標記不同的Spark版本提供的Data Source V2 API,我們可以看到,每個版本都加了大量的API。可以看出,DataSourceV2 API 功能十分豐富,但我們更看重的是它有一個非常良好的擴充套件性,使得API可以一直進化。

圖片

Apache Iceberg在現階段對Data Source V2 API提供了一個比較完整的適配,因為Iceberg的社群成員是Data Source V2 API主要設計者和推動者,這也為我們提供了一個非常好的Demo。

ClickHouse目前在OLAP領域,尤其在單表查詢領域可以說是一騎絕塵,如果我們能結合Spark和ClickHouse這兩個大資料元件,讓Spark讀寫ClickHouse像訪問Hive表一樣簡單,就能簡化很多工作,解決很多問題。

基於Data Source V2 API,我們實現了並開源了Spark on ClickHouse,除了適配API提升Spark操作ClickHouse的易用性,也十分注重效能,其中包括了支援本地表的透明讀寫。ClickHouse基於分散式表和本地表實現了一套比較簡單的分散式方案。如果直接寫分散式表,開銷是比較大的,一個變通的方案是修改調整邏輯,手動計算分片,直接寫分散式表背後的本地表,該方案繁瑣且有概率出錯。當使用Spark ClickHouse Connector寫ClickHouse分散式表時,只需使用SQL或者DataFrame API,框架會自動識別分散式表,並嘗試將對其分散式表的寫入轉化成對本地表的寫入,實現自動透寫本地表,帶來很大的效能提升。

專案地址:

https://github.com/housepower/spark-clickhouse-connector

圖片

下圖描繪了經過改造之後的新一代資料平臺,改造帶來了十分顯著的收益。在硬體資源不變的情況下,首先,Daily Batch ETL從8個小時下降到了2個小時;其次,通過引入Iceberg和增量同步,資料的時效性是從天級降至十分鐘級;第三,收縮計算引擎,原有平臺需要搭配Hive、Elasticsearch、Presto、MongoDB、Druid、Spark、Kylin等多種計算引擎滿足不同的業務場景,在新平臺中,Spark和ClickHouse可以滿足大部分場景,大大減少了計算引擎的維護成本;最後,縮短了資料鏈路,在以前,受限於RDMS的計算能力,我們往往在資料展示前需要進行一遍一遍的加工,最終把聚合結果放到MySQL裡面,但是當引入ClickHouse以後,通常只要把資料加工成主題大寬表,報表展示藉助ClickHouse強悍的單表計算能力,現場計算就可以了,所以資料鏈路會短很多。

圖片

 

Kyuubi 社群展望

最後展望一下Kyuubi社群未來的發展。剛才提到了Kyuubi架構是可擴充套件的,目前Kyuubi相容HiveServer2,是因為僅實現了Thrift Binary協議;Kyuubi目前僅支援Spark引擎,Flink引擎正在開發中,這項工作由T3出行的社群夥伴在做。RESTful Frontend社群也正在做,這樣我們也可以提供RESTful API。我們還計劃提供MySQL的API,這樣使用者就可以直接使用MySQL Client或MySQL JDBC Driver來連線。後面附了社群裡面相關的issue或者PR。

圖片

如下是即將出現在1.4版本中的一些特性,包括剛才已經提到過的Engine Pool、Z-order、Kerberos相關的解決方案,以及新發布的Spark 3.2的適配工作。

[KYUUBI #913] Support long running Kerberos enabled SQL engine
[KYUUBI #939] Add Z-Order extensions to support optimize SQL
[KYUUBI #962] Support engine pool feature
[KYUUBI #981] Add more detail stats to kyuubi engine page
[KYUUBI #1059] Add Plan Only Operations
[KYUUBI #1085] Add forcedMaxOutputRows rule for limitation
[KYUUBI #1131] Rework kyuubi-hive-jdbc module
[KYUUBI #1206] Support GROUP for engine.share.level
[KYUUBI #1224] Support Spark 3.2
. . .

最後展示一下已知使用Kyuubi的企業,如果你也使用Kyuubi,或者在調研企業級的Spark SQL Gateway方案,有任何相關的問題,歡迎可以到我們社群裡面分享、討論。

GitHub分享頁面:

https://github.com/apache/incubator-kyuubi/discussions/925

 

圖片

 

結語

本文根據網易數帆大資料平臺專家、Apache Kyuubi(Incubating) PPMC成員潘成在 Apache Hadoop Meetup 2021 北京站的分享內容整理,重點結合實際落地的案例講述瞭如何實現 Apache Kyuubi(Incubating) on Spark和CDH整合,以及該方案為企業資料平臺建設帶來的收益,並展望了Apache Kyuubi(Incubating) 社群未來的發展。

圖片

 

Apache Kyuubi(Incubating)專案地址:

https://github.com/apache/incubator-kyuubi

影片回放:

https://www.bilibili.com/video/BV1Lu411o7uk?p=20

案例分享:

Apache Kyuubi 在 T3 出行的深度實踐