T3 出行 Apache Kyuubi Flink SQL Engine 設計和相關實踐

語言: CN / TW / HK

在日前的 Apache SeaTunnel & Kyuubi 聯合 Meetup 上,T3出行大資料平臺負責人、 Apache Kyuubi committer 楊華和T3出行高階大資料工程師李心愷共同分享了 Apache Kyuubi(Incubating) 在T3出行的最新實踐與應用,包括基於 Kyuubi 設計的 Flink SQL Engine,Kyuubi 與 Apache Linkis 的整合,以及在T3出行的落地實踐。

JDBC 之於 Flink 的現狀

首先我們來聊一下 Apache Flink 社群 JDBC 的現狀,Flink 在最初是一個流式計算(data flow)的模型,後來將批處理也引入了 data flow 上來,形成了“流批一體”的理念。

Flink 的批處理逐步在發展,但相比於一些成熟的離線計算引擎,比如 Hive、Spark SQL 而言,還不是很成熟,表現之一就是對 JDBC 規範的支援比較欠缺。當然 Flink 社群為此也做出了一些努力,這裡給大家分享兩來自 Flink 的德國的母公司 Ververica 所開源的兩個專案,一個叫做 Flink SQL Gateway,一個叫做 Flink JDBC Driver。

這兩個專案結合起來是能夠讓 Flink 支援 JDBC 的,但是這兩個專案已經有一年多的時間不太活躍了,最後的一個貢獻還停留在 2020 年。根據 Ververica 的現狀估計,這兩個專案再次活躍的可能性並不大。

Flink SQL Gateway 和 Flink JDBC Driver 的設計和實現,這是我基於原始碼畫出的,大致的就是這兩個元件結合起來,跟 Flink 互動提供對 JDBC 的支援。我們可以分成三層來看,最底下的 JDBC Driver 內部是通過 RESTful API 跟 Flink SQL Gateway 的程序所提供的服務去互動,走的是一個 HTTP 的協議。Flink SQL Gateway 可以看作是一個圍繞 flink-client 包裝出來的服務,在內部引入了 Session 的概念,並且提供了 SessionManager,可以認為它對資源做了一些 share 或者 cache 的能力,主入口是 SQLGatewayEndpoint,內部也提供了很多 Operation 的實現。

這些 Operation 可以分為兩大類,一類是本地的 Operation,在記憶體中對元資料進行管理,比如說 catalog 資料;另外一類是 JobOperation。這兩類 Operation 去跟 Flink Cluster 互動,一個是 Insert ,一個是 Selector,向 Flink 叢集去提交作業。

我們可以看到這兩個設計和實現是專門針對 Flink 的。雖然 Flink 提出了流批一體的概念,但據我們瞭解,目前只用 Flink 來把流和批處理的能力全部實現的公司並不多,絕大部分公司還處於 Spark 和 Flink 兩個引擎共存的局面。所以對於一些通用能力,比如說多租戶或者是對JDBC的支援,我們希望能夠提供一個抽象了的實現,能夠同時 Flink 和 Spark 這樣的引擎,未來能夠支援更多的引擎,所以我們的考慮,就是直接基於 Kyuubi 來Flink SQL Engine 的整合。

Flink SQL Engine 的設計與實現

下面介紹 Flink SQL Engine 的設計和實現。這是我畫的一個高層抽象的元件的互動圖,也可以拆成三個層次。最底下是一些 JDBC 或者 REST 的 client,他們會直接跟 Kyuubi Server 來互動。Kyuubi Server 內部分了兩層的實現,最前端是 frontend layer,提供了不同協議的 frontend 的實現,後面是 backend layer,再往後這個 backend 會去跟 Engine 的 frontend 去進行互動。

Engine 在 Kyuubi 裡面的設計其實也分為 frontend 和 backend 兩個層次。在 Flink Engine 的實現,我們提供了 FlinkThrift Frontend Service, backend layer 我們提供了一個 Flink SQL Backend Service,它會去跟 Flink SQL Manager 和 Operation Manager 去互動。

Kyuubi Flink Engine 在整個 Kyuubi 的生態裡面所處的位置,是在縱向的第五排 Kyuubi Engine .我們也可以看到社群即將推出的對 Trino(Presto SQL) 的支援,整體來看 Kyuubi 對其他技術的態度其實是很開放的,它並不要求我們對其他元件去做很多的改動來適配這套體系,這在企業技術選型中,對異構的架構特別友好。

Flink SQL QuickStart DEMO

接下來我們來看 Flink SQL QuickStart 的 DEMO,第一步我們是基於 Kyuubi 的原始碼來構建一個二進位制的釋出包,因為在 1.5.0 版本之前 release 的包裡面, Flink SQL Engine 還沒有被包含進去。(目前 Kyuubi 1.5.0 版本已經發布,包含了 Flink SQL Engine (Beta) 的支援,詳見 http://kyuubi.apache.org/release/1.5.0-incubating.html)

這是 Mac 本機的一個 DEMO,所以我們需要 Hadoop classpath 新增到加到 PATH 環境變數,並啟動一個 Flink 本地叢集。下一步就可以啟動 Kyuubi Server,再利用 Kyuubi 自帶的 Beeline 去連上 Flink Engine。Kyuubi 會自動拉起一個Flink Engine,它是一個 Java 程序。

進入到 Beeline 的互動式命令列裡面去,選擇特定的資料庫之後,就可以執行一些 DDL 和 DML 的操作。這裡我們的示例是建立一張表,然後調了一個 Insert 的語句。執行結束之後,我們開啟 Flink 的 WebUI,就可以看到一個 Insert 的 Job 被提交了,並且已經執行完成。

這個 DEMO 比較簡單,最近社群也有小夥伴提供了一個 on YARN 模式的 QuickStart 的支援和文件,有興趣的小夥伴也可以基於這個文件去了解 on YARN 的提交方式。

Flink SQL Engine 的實現離不開 Flink 社群大佬們的鼎力支援。這裡感謝一下阿里巴巴 Blink 團隊的蔣曉峰同學,和網易遊戲實時團隊的林小柏同學,一直在社群裡面比較活躍,貢獻了很多 PR。

Flink SQL Engine 目前提供的功能,我大致列了列了一些:

  • 常規的 DDL和 DML 操作,基本上都是支援的
  • JDBC規範的一些 Get 或者 Show 相關的方法也已經支援
  • Flink 所實現的一些設定或者重置屬性相關的方法
  • UDF 的支援
  • 目前的支援的 deploy 模式 Flink Session Standalone / on YARN 模式

Flink SQL Engine 展望

Flink SQL Engine 未來的計劃,首先它的 deploy mode 會有一個很大的變化,主要是往 Application 模式和 Session 模式去提供支援,業界已經用得最多的,也是比較成熟的 Per-Job 模式會被廢棄掉。Kyuubi 的 Flink Engine 未來應該主要是集中在 Application 模式,Session 模式也支援,但並不是第一選擇。

其他的一些規劃,包括 Flink SQL Engine on YARN(application) 的支援,Flink SQL Engine on Kubernetes 的支援,另外是對 Kyuubi 共享級別的增強,還有一些使用上的提升,比如支援 Session 裡面的 JAR 的管理。當然還有一些可能是需要大家使用之後去反饋的,也歡迎大家參與進來,一起貢獻。

Why Kyuubi

下面介紹 Kyuubi 在T3出行的一些應用場景。

T3出行是一個基於車聯網驅動的平臺,以車聯網的資料為主,基於車聯網資料的多樣性,T3出行構建了一個以 Apache Hudi 為基礎的企業級資料湖的平臺,而且在此之上構建了 BI 平臺,任務排程,機器學習,資料質量等等一系列的平臺,為業務提供了支撐。隨著業務的發展,平臺越來越多,對於這些平臺的統一管理也越來越複雜,業務小夥伴的使用體驗也變差。我們經過一系列的調研選型,決定選擇了微眾銀行開源的 DSS(DataSphere Studio) 作為一站式資料應用的互動管理平臺,並且根據公司的實際場景進行了一些定製化的開發。

下圖是 DSS 引入 Kyuubi 之前的架構,我們是通過 Kafka 和 CANAL 來定位資料,然後資料進入物件儲存,以 Hudi 的格式儲存在資料湖之上,資源編排是 YARN,計算引擎主要是 Spark、Hive 和 Flink。計算引擎通過計算中介軟體 Linkis 來統一管理互動,同時和 DSS 之間做了一個打通,在 Linkis 計算中介軟體之上構建了 BI 平臺和資料地圖,資料開發,機器學習等等,這多種平臺通過 DSS 這個一站式的入口來統一進入管理。

這個架構在實際使用中遇到了一些問題,比如說跨儲存的問題,現在資料分佈在 OBS 物件儲存, Hudi 的格式儲存,還有 ClickHouse、MongoDB 等不同的成熟資料,開發小夥伴就需要寫各種程式碼進行關聯分析,或者 ETL 的匯入,Linkis 對此解決還是比較有限的。還有 SQL語法的不統一,比如 Hive 或者原生的 Spark SQL 不支援 upsert、update、delete 等語法操作,還有 MongoDB、ClickHouse 等語法也各不相同,開發轉換成本比較高。同時 Linkis 和 Hive、Spark 版本是強耦合的,如果升級 Spark 版本,就需要修改一系列的原始碼,重新編譯,升級的難度比較大;同時 Linkis 中的 Spark 引擎對於 cluster 執行模式、 AQE、動態資源等特性的支援還不完善,改造的成本都比較大。

所以在此之上我們引入 Apache Kyuubi,T3出行使用了很長時間 Kyuubi,所以我們決定調研一下,看能否把 Kyuubi 和 Linkis 互相打通。Kyuubi 是一個統一的 Thrift JDBC 的服務,對接了 Spark 引擎和 Flink 引擎,Trino 社群也做了一些對接,所以能夠管理多種引擎,可以滿足 BI、ETL、ad-hoc 的一些場景,同時已經是進入了 Apache 孵化器,為企業級資料湖探索提供了一個標準化的介面,賦予使用者調動整個資料湖資料的一個能力,讓使用者能夠像處理普通資料一樣處理大資料,是一個 Serverless 的服務。

對比一下 Kyuubi 和 Linkis、Hive on Spark,如下表,Hive 和 Kyuubi 都是 JDBC 的介面,Linkis 自己提供了一些 HTTP 的 REST API,Linkis 主要基於 Spark 引擎或者 HiveQL 之類的語法,Kyuubi 主要是 Spark SQL 或者 Flink SQL 語法,Hive 還是自己的 HQL 語法。

SQL 解析 Hive 是 Server 端,Linkis 和 Kyuubi 都是 Engine 端。任務提交,Hive 拆分了多個 RemoteDriver 提交,Linkis 基於 Server 端進行的一個分散式的執行緒排程,Kyuubi 自己有一個資源隔離的機制,通過 USER、GROUP 或者 CONNECTION 的策略來隔離資源。

Linkis 和 Spark、Hive 的版本是繫結的,Kyuubi 就比較靈活,支援多版本適配,它和 Server 端和引擎端是分離的,比較便於我們升級和更新迭代。計算資源方面,Linkis 是基於自己的引擎管理,同時它使用的時候它是會把資源給鎖住了,Kyuubi 是基於 YARN 和 Kubernetes 的資源排程,在此之上通過 Engine 的維度來進行資源的管理,比 Linkis 要靈活很多。

Linkis 整合 Kyuubi 實踐過程

下面介紹 Linkis 和 Kyuubi 的整合流程,Linkis 支援新增一個自定義引擎的,支援多種的引擎型別,我們選擇了其中的 ComputationExecutor:這個型別,集成了它的方法,因為它是常用的互動式引擎 Executor,能夠處理互動式執行任務,並且具備狀態查詢、任務 kill 等互動式能力。而 Kyuubi 引擎是一個互動式引擎,所以在此之上實現這個 Execute 是比較合適的。

實現這個引擎,主要是 com 檔案要引入 Linkis 的一個相關包,具體可以看 Linkis 的官方文件《如何快速實現新的底層引擎》。

我們要實現這幾個模組,KyuubiEngineConnPlugin 是啟動引擎的一個連線的入口;KyuubiEngineConnFactory 是實現一個引擎的管理,啟動引擎的一個整體的邏輯;KyuubiEngineLaunchBuilder 模組用於封裝引擎端管理,是解析啟動命令的;實際執行的場景得是 KyuubiExecutor,它直接和 Kyuubi Server 來互動,實現計算邏輯的執行的單元。主要是這幾大塊的實現,大致的程式碼架構如下圖所示。

Linkis 啟動和 Kyuubi 之間的互動主要是通過 Linkis 的 Gateway 轉發引擎的一個管理管理模組,通過 Linkis 的引擎管理模組會啟動引擎的聯結器管理,聯結器管理是多個的,對接不同的使用者、不同的引擎,來啟動執行 Kyuubi 引擎。Kyuubi 引擎再和 Kyuubi Server 之間建立會話,然後通過和 Kyuubi Server 之間互動,比如說查詢或者是 DDL 操作,會把這些返回的結果儲存到一個 HDFS 臨時目錄裡,該臨時目錄會返回之前的查詢結果給 Gateway,Gateway 再返回給使用者的一個實際客戶端。

引入 Kyuubi 之後整體架構如下圖。主要的變化是計算中介軟體,是由 Kyuubi 和 Linkis 共同來實現,SQL 模組都交予 Kyuubi 來管理,其他的模組還是 Linkis 來管理,同時和任務排程、計算引擎之間都打通了。後續我們希望能把 Scala 或者 Flink 之類的任務也都整合在 Kyuubi 之上。

Kyuubi 在一站式平臺使用場景

Kyuubi 在一站式平臺的實際使用場景,可以看到 DSS 資料開發模組,我們直接增加了一個 Kyyubi 的型別。Kyuubi的型別直接對接 Kyuubi 服務,可以在上面通過一些 SQL 語句進行資料開發,並且實現一站式的開發和 CI/CD 的管理。

開發好的指令碼可以任務編排,任務編排裡集成了 Kyuubi 型別,可以直接把 Kyuubi 作為一個元件來進行任務編排,關聯已經編寫好的指令碼。這些編寫好的指令碼有一個釋出的功能,這個釋出功能是和 DSS 打通的,釋出到 DSS 之上的時候相當於一個 SQL 模組,就是把已經寫好的一些指令碼,使用 Kyuubi 的資料來源就釋出到釋出到 DS 之上,這就形成了一整套 CI/CD 的過程。

之前 DSS之上對 Linkis 有一系列的 WebUI 的監控管理,而 Kyuubi 在這方面是沒有的,所以我們在這基礎上加強了 Kyuubi 的後臺管理功能,單獨開發了一個 Kyuubi Web 的 Server 模組,通過 UI 對使用者的操作進行統一的管理監控,主要是對 Kyuubi Server 端進行連線數、引擎數量、JVM 的監控。

還有對使用者會話的 Session 能進行接通,這個 Session 直接是呼叫 Server 端 Session API 獲取一些 Session 狀態的介面,然後存到一個 MySQL 的持久化儲存,同時也可以手動調 Server 端的 API 去關閉 Session。

此外對使用者提交使用的一些語句也都能展示,或者統一管理,這樣方便後臺管理員來管控使用者的操作,同時回溯問題的時候也會比較方便。

小結一下,T3出行大資料平臺引入 Apache Kyuubi 後,和 Linkis 功能互補,實現了程式碼開發、業務上線與排程系統的打通,同時可以收口做到大資料開發 CI/CD 管理,幫助業務部門低門檻上線大資料相關的需求,減輕了資料開發的壓力,向我們一站式開發平臺的目標更進了一步。也期望 Apache kyuubi 和 Linkis 作為計算中介軟體的 引領者越來越好!

作者:楊華,李心愷

附影片回放及PPT下載:

T3 出行 Apache Kyuubi FlinkSQLEngine 設計和相關實踐

延伸閱讀:

eBay 基於 Apache Kyuubi 構建統一 Serverless Spark 閘道器的實踐

Apache Kyuubi 在 T3 出行的深度實踐

Who is using Apache Kyuubi (Incubating)?

Kyuubi專案主頁

Kyuubi程式碼倉庫