Apache Hudi 新版本新特性超強解讀

語言: CN / TW / HK

本次分享主要是針對Apache Hudi 0.11.0新版本新特性進行深度解讀,主要介紹4個方面的內容:

  • 多級索引
  • Spark SQL 新功能
  • Flink 整合改進
  • 其他功能和提升

01多級索引

首先和大家分享下多級索引,接下來我們將從三個方面介紹它。第一是為什麼我們引入多級索引multi model index,第二多級索引的設計以及實踐,最後將介紹如何利用多級索引,極大提升讀寫效能。

1. 為什麼要引入多級索引(Multi-Modal Index)

在介紹多級索引之前,我們先看看 索引是什麼? 索引是資料庫系統中常用的查詢加速技術。通過構建索引就可以利用生成的元資料metadata快速定位查詢所需資料的位置,這樣可以減少甚至避免從檔案系統中掃描或者讀取不必要的資料,減少IO的開銷,大大提升查詢效率。我們可以類比圖書館以及教科書中的索引,這些都是通過利用提前生成好的metadata來快速找到想要查詢的資訊。

其實在Apache Hudi的湖倉一體的架構中已經提供了特有的索引支援,這裡我們可以看一個例子。其中我們用的一種索引可以快速定位到我們所需要更新或者刪除的記錄所在的檔案組。如下圖所示:

如果沒有index的情況下,對於所有的更新以及刪除的記錄,我們需要和所有的檔案進行merge,這樣開銷會非常大。如果使用了索引,讀寫開銷會極大地降低,可以提高查詢這個位置的效率。

在Hudi中我們預設開啟了基於bloom filter的索引。這些bloom filter是儲存在資料檔案的footer中。作為一個單獨的存在,它可以被用到索引的流程過程中。

那我們為什麼還要在Hudi中引用多級索引? 其實索引的主要目的就是剛剛提到的提升資料查詢的速度,那麼就需要儲存metadata。而對於TB、EB級別表中的metadata,它們的量級會很大。通常的做法是把它們儲存於單獨的資料塊block或者是資料檔案中。而我們在實測的過程中,遇到了讀寫瓶頸。同時,在維護metadata的過程中,它需要實時的和資料表進行同步。這樣的管理十分複雜,因為要保證transaction,如果我們要在Hudi中引用新的索引開發週期會很長。

2. 多級索引的設計與實現

為了解決上述的問題,我們在湖倉一體的儲存架構中引入了多級索引,是首次在類似的架構中引入統一平臺、多元化、高效能的索引。我們的目標是支援所有計算及查詢引擎來提升讀寫效能,甚至未來如果出現新的引擎,也是可以進行相容的。

接下來我們介紹一些在多級索引設計中所需要的需求。

  • 第一是我們需要保證可拓展的元資料(scalable metadata) 。我們希望元資料是Serverless的,是不需要任何的計算或者是記憶體中需要支援的,可以獨立存在於數倉一體和資料湖中。同時希望它能獨立於計算及查詢引擎,它是可拓展的,能高效能地支援不同索引型別。
  • 第二是我們希望多級索引中的元資料和資料表保持實時同步, 保證每次的更新都是事務性的。
  • 第三是保證查詢的速度 。保證對於多級索引的查詢是低延遲的,主要的查詢型別包括point, range, and prefix lookups等。

我們分別來看一下是如何實現的。

首先是可拓展的元資料,我們採用了和已有資料庫類似的設計,那就是在內部構建一個元資料表meta table。對於Hudi table來說,我們利用的是Hudi的mor表來儲存這些資料。mor表的優勢是可以很快地進行資料的更新與刪除。同時,Hudi表也是Serverless的,它不會依賴任何計算及記憶體資源。在這個表裡我們針對不同的索引是建立獨立的分割槽的。在這樣的情況下,不同的index可以完成獨立的管理以及自動化的管理。我們在使用mor表的另一個優點是可以支援任意的索引大小。從mb級別到gb級別再到tb級別。針對獨立的分支,我們可以引入新的作用型別,就只需要建立新的分割槽。在構建可拓展的元資料的時候,需要索引的一個初始化。我們提高了兩種方式的初始化。一種是同步,同步是在寫入表的過程中,在最後commit之前會做一個index的步驟。而第二種方式是非同步。非同步建立索引是hudi首次引入的,保證了concurrent writer 不受影響。 下面是非同步建立索引的流程圖:

  • 第二點的設計原則是保證對metadata table的更新是事務性的,來保證metadata table結構裡面的資料要和資料表實時同步。我們設計了一套叫multi table多表的transaction。同時在這個metadata table 裡,有自我管理的表服務,包括compaction, cleaning。它們會保證定時操作,以保證這個metadata table 的讀效能會很好。
  • 第三點是對於metadata的快速查詢。我們使用了HFile作為MDT的資料格式。原因是列格式Parquet或基於行的Avro不適合 pointed lookup;HFile格式中的index使得 pointed lookup非常高效,只需要讀取相關資料塊。我們針對HFile做了一個測試,得出在千萬(10M)+條目中查詢 N 條目,HFile 相比於 Parquet、Avro 有 10-100倍的提升。如下圖:

3. 利用多級索引極大提升讀寫效能

接下來介紹多級索引所帶來的主要的讀寫效能提升。

  • 首先是File Listing

在雲端儲存中我們發現大部分情況下,如果對於大型表的上千分割槽以及百萬級的資料檔案做listing,會造成讀寫瓶頸。這主要是因為雲端儲存的設計所導致的。如果我們利用metadata table中的files來做分割槽。這個分割槽裡提供了這個資料表裡所有的file。相比於雲軟體系統有2-20倍的提升。如下圖:

  • 另一個比較重要的特性是Data Skipping

Data Skipping 技術是利用列統計資料來對所需要的這個資料檔案做file pruning (檔案裁剪),列統計資料常見的列統計資料包括取最大值、最小值、數量、大小等。Data Skipping 的作用就是通過這些統計資料來排除掉不需要讀的檔案,這樣可以極大的提高查詢速度。我們在這個multi model index 的metadata 中構建了column_stats分割槽,這個分割槽裡的每條記錄包含了這個Hudi表裡所對應檔案的列統計資料。每個Record key是由列名稱、分割槽名稱和檔名稱組成。通過這種排列格式,可以快速定位所需的column stats。查詢複雜度是基於查詢列所需要的列的數量,通常這個數量是5到10個。對於大寬表來說,這樣可以極大地提升這個效果。在實際測試中,雲上Hudi大寬表的“定向”查詢速度有10x-30x的提升。大幅減少了對無關資料檔案的掃描和讀取,提高了I/O效率。 如下圖:

  • 我們還對Upsert 效能做了測試。我們在metadata table中引入了一個bloom_filter 分割槽,取代了footer 中的 bloom filter,在大表中可以大幅減少檔案讀取的時間。每條記錄包含單個數據檔案(partition + file name)的 bloom filter,支援 prefix lookup。據實測,在有 100k 檔案的 Hudi 表中,相比於讀取 footer,從 MDT 讀取 bloom filter的速度有3倍的提升。t6.c
  • 基於多級索引,未來還有很多工作可以做,目前一個工作是針對record level index 的開發以及Luncene index的開發。

02 Spark SQL新功能

接下來再講Hudi在Spark SQL方面的改進。

1. 使用者可以使用非主鍵欄位更新或刪除 Hudi 表中的記錄

Spark SQL改進Delete Operation。

在t1時刻分別往mor表cow表中分別插入a,b,c三條資料。這樣會在mor表中生成base file 檔案和log file檔案(下圖中簡化了示意圖)。在cow表中只會生產base file 檔案。t2時刻同時刪除mor表和cow表中 b的資料。mor表操作是刪除log file b的block是t2時刻的資料。而cow表中的操作是複製一份base file b儲存到記憶體中,刪除b資料之後會形成一個新版本的綠色的方框中的資料檔案。如下圖:

2. SQL支援時間旅行查詢

我們為什麼要實現Time travel?從api層面,如果我們要寫一個查詢,需要設定不同的df,構造不同的operation,然後來查詢這個動作。但是引用time travel的這個語句以後,一是可以在spark sql中直接使用,二是sql語句更容易去解釋這樣的一個行為和動作。現在可以通過timestamp as of語法支援時間旅行查詢,但僅限Spark 3.2+。語法如下:select * from hudi_tbl timestamp as of '20210728141108100' 

①SQL Travel-場景1:查詢多版本資料

如下圖,我們在10:10分提交了insert和update語句,我們想要查10:05分版本的資料,通過下面的sql是可以實現的。

select * from test_hudi timestamp as of 20220512100510000 (10:05)

select * from test_hudi timestamp as of 20220512101030000 (10:10)

②Travel-場景2: 資料還原修復

  • 建立hudi表:create table test_hudi ...
  • 插入資料:insert into test_hudi ... (每插入一條資料會產生一個版本)

  • 查詢資料:select * from test_hudi

  • 誤刪資料:delete from test_hudi where id=2
  • 查詢資料:select * from test_hudi (刪除id=2的資料後只剩下了兩個版本)

  • 還原資料:insert into test_hudi_table select * from test_hudi timestamp as of 20220511165343733(資料如果存在的話update,如果資料不存在就insert)
  • 查詢資料:select * from test_hudi (id=2的資料已經還原回來了)

注意:如果這個表用了 truncate清空的話,這種時間戳方式查詢恢復就不行了。 

3. 新增CALL命令以支援在 Hudi 表上呼叫更多操作

Call Command產生的一個背景是spark sql除了ddl、dql和dml之外的操作,我們想解決這三種操作之外的一些新功能的操作。那麼在引入Call Command之前是沒辦法操作的。我們對比了一下傳統資料庫裡面的儲存過程,類似地在spark這一塊實現了一個command動作,然後對應實現了一個procedure的功能。

首先是在hudi一側為call command生成了一個通用的語法,不依賴於spark的版本,可以對所有spark版本適用。然後生成了一個HoodieProcedure的類,使用CallProcedureHoodieCommand類呼叫動作。

Call Command命令在設計時主要有四個方面的功能。一是支援歸檔、提交、回滾和建立還原點的快照動作。二是可以進行原資料管理。三是對運維表進行資料匯入匯出、Boostrap、修復表、升級/降級等操作。四是優化表動作,如Compaction、Clustering、Clean等。

Call Command的引數有三類。一是可以使用不定式的引數(鍵值對)作為它的入參引數。二是可以按照引數的位置進行入參。三是混合引數。以下是傳參的具體語法:

  • 接下來是CALL Command功能方面的一些介紹。

①CALL Command-快照管理的相應的命令

  • 查詢快照 :call show_commits_metadata(table => 'test_hudi_table', limit => 10)
  • 回滾快照:

call rollback_to_instant('test_hudi_table',20220511224632307')

  • 建立儲存點 :create_savepoints
  • 恢復儲存點 :rollback_savepoints
  • 刪除儲存點 :delete_savepoints

②CALL Command-Clustering

可以設定Clustering的型別:

set hoodie.layout.optimize.strategy=linear /z-order/hilbert

常用命令:

call run_clustering(table => 'test_hudi_table',order => 'ts') 

call show_clustering(table=>'test_hudi_table')

通過這些clustering的動作,在查詢的時候效能能達到10-20倍的提升。

③CALL Command-Compact(小檔案合併)(目前只支援mor表)

Data file和Delta log file 合併會重新生成一個新的檔案。

命令:call run_compaction(table=>'test_hudi_table',op=> instant

03 Flink整合改進

最後,介紹Flink整合改進方面的內容。主要有以下幾點:

1. 在 0.11.0 中,同時支援 Flink 1.13.x 和 1.14.x。

2. 支援複雜的資料型別。

3. 基於 DFS 的 Flink HoodieCatalog。

綠色是目前已經實現的操作,紅色是待實現的操作。可以通過ApI實現或者是Flink Sql實現。

FlinkHoodieCatalog在三個方面有比較好的作用:

  • 第一個方面是對元資料的管理,通過FlinkHoodieCatalog這個框架可以更好地管理HoodieCatalog;
  • 第二個方面是可以基於這個框架可以去和外面的其他的平臺去對接,更方便對接使用;
  • 第三個方面是元資料方面,可以在資料血緣方面構造一些功能。

4. Bucket Index 

為什麼要整合改進Bucket Index?這個是位元組的同學貢獻的一個功能。他們在他們的生產場景裡面,在34tb的資料量在5000億的記錄寫入的情況下,Bloom Filter Index 通過Record key去找File ID 的這樣的一個動作的效能會下降得很快。為了解決Bloom Filter Index的假陽性,他們引入了Bucket Index。

通過key的雜湊值定位到File Group,提升了實時匯入的效能。如下圖所示:

從Flink輸入了5條資料,然後通過一定的雜湊策略將混合的Bucket Index進行關聯,通過拿到FileGroupId寫入檔案。

  • 利用Bucket分佈做優化

Bucket分佈優化主要有:Bucket Pruning、Bucket Aggregate、Bucket Join等。如下圖所示:

  • Bucket限制。目前 Bucket Index 的桶數量 ,需要根據預估的資料量提前在建表時進行確定,且建表後不可更改。
  • Bucket使用

引數:hoodie.index.type 值:BUCKET

引數:hoodie.bucket.index.num.buckets 值:48(256MB)

建議單個桶的大小控制在3GB左右。

  • Bucket後續改進。Hashmap的擴容過程,將分桶數按倍數做到輕量級動態擴容。

04其他功能和提升

最後,我們來講其他功能和提升。

1. Spark DataSource 查詢優化

我們在使用mor表做快照查詢的時候,log檔案會被讀取,然後和base檔案進行合併。在之前的版本中,當你做快照查詢的時候,整條log檔案記錄會被讀出來。這個版本我們做了優化,使用了內建的標準Payload來讀取。例如:OverwriteWithLatestAvroPayload。我們會針對這個做了優化,只把必要的列讀出來,這樣就會極大的減少記憶體和壓縮解碼帶來的CPU的消耗。其實是對於非常寬的上千列的表來說,效果會非常明細。

2. Schema 演進 

在這個版本中,我們針對Spark 3.1、Spark 3.2版本增加了schema功能的演進。如果啟用 set hoodie.schema.on.read.enable=true以後,我們可以對錶列和對錶進行一系列的操作。列的變更(增加、刪除、重新命名、修改位置、修改屬性),表的變更(重新命名、修改屬性) 等。

3. 儲存點和恢復 

儲存點和恢復可以用call command做這些操作。新版本引進了mor表,用Hudi CLI設定儲存點和執行恢復或者call command來手動設定儲存點。儲存點之後的資料將會被刪除。

4. Pulsar 寫提交回調 

  • Hudi寫入commit支援Pulsar下游作業* 
  • 主要配置 HoodieWriteCommitPulsarCallback 

hoodie.write.commit.callback.pulsar.topic

hoodie.write.commit.callback.pulsar.broker.service.url

  • 可擴充套件的介面HoodieWriteCommitCallback 

5. 多元化的Catalog同步

  • Google BigQuery 支援查詢COW表* 
  • DataHub 支援同步Schema 
  • 通過AWS SDK原生同步Glue Data Catalog