Apache Hudi X Apache Kyuubi,中國移動雲湖倉一體的探索與實踐

語言: CN / TW / HK

分享嘉賓:孫方彬 中國移動雲能力中心 軟體開發工程師

編輯整理:Hoh Xil

出品平臺:DataFunTalk

 

導讀:在雲原生+大資料的時代,隨著業務資料量的爆炸式增長以及對高時效性的要求,雲原生大資料分析技術,經歷了從傳統數倉到資料湖,再到湖倉一體的演進。本文主要介紹移動云云原生大資料分析LakeHouse的整體架構、核心功能、關鍵技術點,以及在公有云/私有云的應用場景。

主要內容包括:

  • 湖倉一體概述

  • 移動雲LakeHouse實踐

  • 應用場景

01 湖倉一體概述

1. 關於湖倉一體

“湖倉一體”是最近比較火的一個概念,“湖倉一體”的概念最早起源於Databricks公司提出的Lakehouse架構,它不是某個產品,而是資料管理領域中的一種開放的技術架構範例。隨著大資料和雲原生技術的發展和融合,湖倉一體更能發揮出資料湖的靈活性與生態豐富性,以及資料倉庫的成長性。這裡的成長性包括:伺服器的成本,業務的效能,企業級的安全、治理等特性。

大家可以看到(左圖),在特定業務規模前,資料湖的靈活性有一定優勢,隨著業務規模的增長,資料倉庫的成長性更有優勢。

湖倉一體的2個關鍵點:

  • 湖和倉的資料/元資料在不需要使用者人工干預的情況下,可以無縫打通、自由順暢地流(包括:由外向內入湖、由內向外出湖、圍繞周邊環湖);

  • 系統根據特定的規則自動地將資料在湖倉之間進行快取和移動,並能與資料科學相關的高階功能打通,進一步實現敏捷分析和深度智慧。

2. 主要理念

隨著業務資料量的爆炸式增長以及業務對高時效性的要求,大資料分析技術經歷了從傳統數倉到資料湖,再到湖倉一體的演進。傳統基於Hadoop的大資料平臺架構也是一種資料湖架構,湖倉一體的核心理念以及與當前Hadoop叢集架構的區別大致如下:

  • 儲存多種格式的原始資料:當前Hadoop叢集底層儲存單一,主要以HDFS為主,對於湖倉一體來說,逐漸會演進為支援多種介質,多種型別資料的統一儲存系統

  • 統一的儲存系統:當前根據業務分多個叢集,之間大量資料傳輸,逐漸演進到統一儲存系統,降低叢集間傳輸消耗

  • 支援上層多種計算框架:當前Hadoop架構的計算框架以MR/Spark為主,未來演進為在資料湖上直接構建更多計算框架和應用場景

湖倉一體的產品形態大致有兩類:

  • 基於公有云上資料湖架構的產品和解決方案(例如:阿里雲MaxCompute湖倉一體、華為雲FusionInsight智慧資料湖)

  • 基於開源Hadoop生態的元件(DeltaLake、Hudi、Iceberg)作為資料儲存中間層(例如:Amazon智慧湖倉架構、Azure Synapse Analytics)

02 移動雲LakeHouse實踐

下面介紹移動雲LakeHouse的整體架構及對湖倉一體的探索和實踐:

1. 整體架構

上圖是我們的整體架構圖,名字叫雲原生大資料分析LakeHouse。雲原生大資料分析LakeHouse採用計算和儲存分離架構,基於移動雲物件儲存EOS和內建HDFS提供了支援Hudi儲存機制的湖倉一體方案,通過內建Spark引擎進行互動式查詢,可以快速洞察業務資料變化。

我們的架構具體包括:

  • 資料來源:包括RDB、Kafka、HDFS、EOS、FTP,通過FlinkX一鍵入湖

  • 資料儲存(資料湖):我們內建了HDFS和移動雲的EOS,藉助Hudi實現Upsert能力,達到近實時的增量更新,我們還適當地引入Alluxio,進行資料快取,來達到資料分析的SQL查詢加速能力。

  • 計算引擎:我們的計算引擎都是Severless化的,跑在Kubernetes中。我們引入了統一資源訪問/排程元件YuniKorn,類似於傳統Hadoop生態體系中YARN的資源排程,會有一些常見的排程演算法,比如共性排程,先進先出等常見的排程

  • 智慧元資料:智慧元資料發現,就是將我們資料來源的資料目錄轉化成內建儲存中的一個Hive表,統一進行元資料管理

  • 資料開發:SQLConsole,使用者可以直接在頁面上編寫SQL進行互動查詢;還有SDK的方式,以及JDBC/ODBC介面;後續我們會支援DevIDE,支援在頁面上的SQL開發

2. 核心功能

核心功能主要有以下四方面:

① 儲存和計算分離:

  • 儲存層與計算層分離部署,儲存和計算支援獨立彈性擴縮容,相互之間沒有影響

  • 儲存支援物件儲存和HDFS,HDFS儲存結構化資料,提供高效能儲存,物件儲存儲存非結構化、原始資料、冷資料,提供高性價比

  • 計算支援多引擎,Spark、Presto、Flink均實現serverless化,即開即用,滿足不同查詢場景

② 一鍵入湖:

  • 支援連線移動云云上雲下多種資料庫、儲存、訊息佇列

  • 入湖流程自動化,降低使用者的配置成本

  • 降低對資料來源的額外負載,控制在10%以內,支援根據資料來源的例項規格自動調整連線數(比如在MySQL同步資料時,會在MySQL負載允許的情況下,自動調整連線數)

  • 支援增量更新(通過Hudi實現增量更新)

③ 智慧元資料發現:

  • 基於特定的規則,智慧識別結構化、半結構化檔案的元資料,構建資料目錄

  • 自動感知元資料變化

  • 統一元資料,提供類HiveMeta API,針對不同計算引擎訪問底層資料

  • 智慧資料路由和許可權統一管控(藉助移動雲的賬號體系和Ranger實現的)

④ 按量計算:

  • 儲存資源按照使用量計費

  • 計算資源支援多種計費模式

  • 支援彈性調整租戶叢集資源規格,快速擴縮容

3. 基於RBF的邏輯檢視

在基於Hive構造的資料湖體系中,每個Hive db通常對應一個數倉例項,共享統一的儲存HDFS,為了實現儲存資源的多租戶隔離特性,我們借鑑RBF的統一檢視隔離能力,通過Zookeeper上不同的Znode來隔離多個數倉例項StateStore,使每個數倉擁有自己獨立的邏輯檢視,同時利用RBF掛載多NameSpace的能力來實現NameNode負載均衡的效果。此外,為順應雲原生趨勢,我們將RBF服務容器化部署,在建立Hive db時指定由RBF構成的HDFSschema路徑,可以實現資源快速的建立、擴充套件和回收。

上圖是我們的一個簡單的架構圖,RBF以Pod的形式部署在Kubernetes中,然後Hivedb分別對映為一個RBF的schema路徑。然後,下面是藉助了NameSpace的負載均衡能力。

這樣,通過為使用者提供單獨的儲存邏輯檢視,不僅可以隔離不同數倉例項之間的資料,又能借助RBF對底層HDFS的負載均衡來實現對Hive資料的負載均衡能力。

例如,對Hive db目錄hivedbdir通過RBF方式mount到兩個Namespace,掛載命令如下:

$ hdfs dfsrouteradmin -add/hivedbdir ns1,ns2 /data -order HASH_ALL

4. Hive在物件儲存的多租戶實現

在公有云場景下,不同使用者的bucket認證資訊不同,需要多次配置並重啟HiveServer服務,無法在物件儲存上實現Hive多租戶的效果。為解決這個問題,我們通過修改Hive原始碼在表屬性tblproperties中新增s3的認證引數,在訪問bucket時載入表屬性中的認證資訊至threadlocal conf變數,來完成session級別的認證引數傳遞。這樣就在不重啟Hive服務的情況下支援了多bucket認證,達到了物件儲存上的Hive多租戶效果。

如圖所示,如果在服務端為使用者配置不同的引數,就需要重啟服務,這時不能夠接受的。經過我們的改造之後,建表語法就變成了下面這種格式:

create external table testcephtbl(id int) location 's3a://bucket1/tmp/testlocation' tblproperties('fs.s3a.access.key'='xxx,'fs.s3a.endpoint'='xxx','fs.s3a.secret.key'='xxx);

5.優化引擎訪問物件儲存

在大資料生態中,多種計算引擎都可以通過Metastore服務訪問Hive中的資料,例如SparkSQL要訪問存在物件儲存中的Hive資料,需要在提交作業的Driver模組中根據表的location資訊載入對應bucket認證資訊,SQL提交命令如下:

$SPARK_HOME/bin/beeline-u “jdbc:hive2://host:port/default?fs.s3a.access.key=xxx;fs.s3a.endpoint=xxx;fs.s3a.endpoint=xxx”-e “selecta.id from test1 a join test2 on a.id=b.id”

也就是說,使用者需要感知資料是存在物件儲存中,並且很難確定一個SQL中的多個表屬於哪幾個bucket,嚴重影響了業務開發進度。為此,我們基於之前的Hive表屬性實現了獲取物件儲存認證引數外掛,使用者無需感知SQL中的表來自哪個bucket,也無需在提交SQL時指定認證引數。如上圖橙色框所示,Spark SQL在Driver中實現引數,來匹配認證引數資訊。對MetaStore來說是一個統一的訪問檢視。

最終提交SQL作業命令如下:

$SPARK_HOME/bin/beeline -u “jdbc:hive2://host:port/default”-e “select a.id from test1 a join test2 ona.id=b.id”

6. Serverless實現

這裡以Spark為例,通過RBF的多租戶實現,Spark程序執行在安全隔離的K8S Namespace中,每個Namespace根據資源規格對應不同的計算單元(例如:1CU=1 core * 4GB)。對於微批的場景,使用SQL Console每提交一個task,engine模組會啟動一個Spark叢集,為Driver和Executor按特定的演算法申請相應的計算資源來執行計算任務,任務結束後資源即刻回收;對於即席ad-hoc的場景,可以使用JDBC提交task,engine模組通過Kyuubi服務啟動一個session可配置的spark叢集,長駐一段時間後回收資源;所有的SQL task只有在執行成功後按實際的資源規格計費,如果不使用是不收費的。

邏輯檢視如上,我們的Kubernetes通過每個Namespace把資源進行隔離;上面是一個統一排程的YuniKorn進行Capacity Management/Job Scheduling的排程。再往上是SQL Parser元件,會把SparkSQL和HiveSQL語法進行相容;最上方,我們還提供了Spark JAR的方式,能夠支援分析HBase或者其它介質中結構化/半結構化的資料。

通過Serverless的實現,我們大大的降低了使用者的使用流程。

沒有用Serverless時的流程:

① 購買伺服器,構建叢集

② 部署一套開源大資料基礎元件:HDFS、Zookeeper、Yarn、Ranger、Hive等

③ 利用不同工具匯入資料

④ 編寫查詢SQL計算,輸出結果

⑤ 各種繁瑣的運維

使用Sercerless後的流程:

① 註冊移動雲賬號,訂購LakeHouse例項

② 建立資料同步任務

③ 編寫查詢SQL計算,輸出結果

④ 服務全託管,全程無運維

7. 元資料管理與發現

元資料管理模組基於特定規則,智慧識別結構化、半結構化檔案的元資料來構建資料目錄,通過週期性的元資料爬取實現自動感知元資料變化,並提供多種優化策略來降低爬取時對資料來源的負載;同時,提供類Hive Metastore的API供多種計算引擎直接對錶進行訪問:

元資料管理模組整體架構如左圖所示:通過元資料爬取RDB/EOS資料,格式有json/parquet/avro等常見的半結構化資料,然後是Hive MetaStore統一訪問層,計算引擎hive/spark/presto可以通過類metastore api來訪問存在湖中的資料,使用者通過Web UI進行目錄對映。

檔案類元資料發現過程,如右圖所示:有一張表,下面有幾個目錄,比如按year分開的,然後在某個具體目錄有兩個子目錄,對於它的元資料發現過程,就會出現3行的資料,id、name和type,就會對映成同一張表,然後不同的目錄是按不同的欄位進行分割槽。

8. Serverless一鍵入湖

為實現Serverless的入湖建立,我們採用了基於Flink的分散式資料同步框架FlinkX,來滿足多種異構資料來源之間高效的資料遷移,具備以下特點:

  • 資源彈性:作業執行在Kubernetes上,資源隔離,支援分散式執行和彈性擴縮容

  • 靈活性:將源/目標資料來源抽象成Reader/Writer外掛,支援雙向讀寫和多種資料來源

  • 易用性:操作簡化,支援批流一體、斷點續傳,可自動調整資料來源連線數,降低侵入性

上圖是我們通過FlinkX進行排程任務的流程:

  • 使用者通過JobManager建立並提交task配置,通過Quartz排程task,作業執行時呼叫Flink Kubernetes客戶端訪問Kubernetes Master創建出Flink Master所需要的資源,並啟動相應的Container;

  • Flink Master Deployment裡面內建一個使用者FlinkX Jar,這時Cluster Entrypoint就會從中去執行main函式,然後產生JobGraph;之後再提交到Dispatcher,Dispatcher會啟動一個 JobMaster向KubernetesResourceManager申請資源,RM發現沒有可用的資源會繼續向Kubernetes Master申請資源,請求資源之後將其傳送回去,啟動新的TaskManager;

  • TaskManager啟動之後再註冊回來,此時RM再向它申請slot提供給JobMaster,最後由 JobMaster將相應的FlinkX Task部署到TaskManager上。這樣整個Flink叢集的拉起,到使用者提交Jar都完成了。

我們的Flink叢集其實也是一種serverless的實現。

9. JDBC支援

為了提升不同使用者的資料分析體驗,我們基於Apache Kyuubi來支援多租戶、多種計算引擎的JDBC連線服務,Kyuubi具有完整的認證和授權服務,支援高可用性和負載均衡,並且提供兩級彈性資源管理架構,可以有效提高資源利用率。

在接觸Kyuubi前,我們嘗試使用了原生的Spark thrift server來實現,但是它有一定的侷限性,比如不支援多租戶,單點的不具備高可用,資源是長駐的,資源排程需要自己來管理。我們通過引入Kyuubi來支援多租戶和高可用,通過engine動態申請釋放,並且Kyuubi支援Yarn和Kubernetes資源排程。

在使用過程中,為了適配移動雲的賬號體系以及LakeHouse架構,我們對Kyuubi相應的模組進行了優化和改造,部分如下:

  • 使用者認證:基於移動雲AccessKey,SecretKey對接移動雲認證體系。

  • 資源管理:Kyuubi原生只支援使用者指定資源,基於雲原生適配後禁止使用者指定資源,統一由Lakehouse進行資源排程和分配。

  • 許可權管控:適配Lakehouse底層許可權管理體系,實現細粒度許可權的管控。

  • 雲原生部署:基於Helm3的kyuubi server雲原生部署,支援高可用和負載均衡

  • 物件儲存:支援物件儲存表識別和動態ak,sk許可權認證

10. 增量更新

我們使用Hudi作為資料儲存中間層,能夠基於HDFS、物件儲存等底層儲存,支援ACID語義、實現快速更新能力。常見的流場景如下:

  • 將Kafka/MySQL binlog中的資料藉助DeltaStreamer/CDC通過定時Flink任務寫入到Hudi表中

  • 通過Flink/Spark任務同步Hive元資料

  • 部分源資料修改

  • 使用者訪問和查詢資料

如右圖所示,我們封裝了Hudi自帶的DeltaStreamer / CDC,自定義FlinkX的Reader / Writer特性,實現serverless入湖和資料同步。

如左圖所示,我們比較了兩種資料格式:

  • 對於實時性要求不高的場景儘量使用COW(寫時複製)表型別,如果對資料新鮮度有一定要求則可使用MOR(讀寫合併)

  • MOR會比COW更容易產生小檔案並且對資源需求更高

以上就是移動雲Lakehouse實現的細節。

03 應用場景

最主要的場景是構建雲原生大資料分析平臺:LakeHouse支援多樣化資料來源,包括但不限於應用自身產生的資料、採集的各類日誌資料、資料庫中抽取的各類資料,並提供離線批處理、實時計算、互動式查詢等能力,節省了搭建傳統大資料平臺需投入的大量軟硬體資源、研發成本及運維成本。

另外,在私有云場景下,在充分利用現有叢集架構的前提下,以新增元件方式引入Lakehouse能力;引入數倉能力,適配多種資料統一儲存和管理;統一元資料,形成湖倉一體的元資料檢視:

  • Hadoop平臺檢視:Lakehouse作為Hadoop平臺上一個元件,能夠提供SQL查詢能力,並支援多種資料來源

  • 湖倉檢視:基於LakeHouse提供資料湖倉平臺,HDFS/OceanStor提供儲存,計算雲原生,多種服務統一元資料管理。

今天的分享就到這裡,謝謝大家。

分享嘉賓: