Hudi 原理 | 聊一聊 Apache Hudi 的原理(2)
這周我們會接著上週的話題,繼續聊一聊Hudi的實現原理,主要關注Hudi的核心讀寫邏輯,資料的儲存和處理邏輯,以及一些附屬的功能。正在使用或是考慮使用Hudi的朋友,
請不要錯過
,因為理解了實現原理以後可以避免很多使用上的坑,也能更好地發揮出Hudi的優勢。
今天我們會講一講Hudi這些功能的實現原理:
-
Merge on Read( MOR表 )
-
Transactional( 事務 )
-
Incremental Query( 增量查詢 )
由於這篇文章會用到上一篇文章中講到的知識,還沒有讀過的朋友,推薦先讀完上一篇文章。
Merge on Read (簡稱MO R表),是Hudi最初開源時尚處於“實驗階段”的新功能,在開源後的0.3.5版本開始才告完成。 現在則是Hudi最常用的表型別。
之所以在COW表之後又增加了一種新的表型別,原因在上一篇文章中也有提到
Merge on Read則是對Copy on Write的優化。優化了什麼呢?主要是寫入效能。
導致COW表寫入慢的原因,是因為 COW表每次在寫入時,會把新寫入的資料和老資料合併以後,再寫成新的檔案。 單單是寫入的過程(不包含前期的repartition和tagging過程),就包含至少三個步驟:
-
讀取老資料的parquet檔案(涉及對parquet檔案解碼, 不輕鬆 )
-
將老資料和新資料合併
-
將合併後的資料重新寫成parquet檔案(又涉及parquet檔案編碼, 也不輕鬆 )
就是有點兒不輕鬆
種種原因導致COW表的寫入速度始終快不起來,限制了其在時效性要求高,寫入量巨大的場景下的應用。(關於parquet檔案 不輕鬆 的原因,可以看這篇文章《詳解Parquet檔案格式》)
為了解決COW表寫入速度上的瓶頸,Hudi採用了另一種寫入方式: upsert時把變更內容寫入log檔案,然後定期合併log檔案和base檔案。 這樣的好處是避免了寫入時讀取老資料,也就避免了parquet檔案 不輕鬆 的編解碼過程,只需要把變更記錄寫入一個檔案即可(而且是順序寫入)。顯然是 輕鬆了不少 。
warehouse
├── .hoodie
├── 20220101
│ ├── fileId1_001.parquet
│ ├── .fileId1_20220312163419285.log
│ └── .fileId1_20220312172212361.log
└── 20220102
├── fileId2_001.parquet
└── .fileId2_20220312163512913.log
典型的MOR表的目錄,注意log檔案包含寫入的時間戳
有些朋友這時或許會有疑問,“這樣寫入固然是輕鬆了,但怎麼讀到最新的資料呢?” 是個好問題。 為了解決讀取最新資料的問題,Hudi提供了好幾種機制,但從原理上來說只有兩種:
-
讀取資料時,同時從base檔案和log檔案讀取,並把兩邊的資料合併
-
定期地、非同步地把log檔案的資料合併到base檔案(這個過程被稱為compaction)
第一種機制也是Merge on Read這個名字的由來,因為 Hudi的讀取過程是實時地把base資料和log資料合併起來,並返回給使用者 。注意這兩種機制不是非此即彼的,而是互為補充。Hudi的預設配置就是同時使用這兩種機制,即: 讀取時merge,同時定期地compact。
在讀取時合併資料,聽起來很影響效率。 事實也是如此 ,因為實時合併的實現方式是把所有log檔案讀入記憶體,放在一個HashMap裡,然後遍歷base檔案,把base資料和快取在記憶體裡的log資料進行join,最後才得到合併後的結果。難免會影響到讀取效率。
COW影響寫入,MOR影響讀取,那有沒有什麼辦法可以兼顧讀寫,魚與熊掌能不能得兼? 目前來說不能 ,好在Hudi把選擇權留給了使用者,讓使用者可以根據自身的業務需求,選擇不同的query型別。
對於MOR表,Hudi支援3種query型別,分別是
-
Snapshot Query
-
Incremental Query
-
Read Optimized Query
其中1和3就是為了平衡讀和寫之間的取捨。這兩者的區別是:Snapshot Query和上文所說的一樣,讀取時進行“實時合併”;Read Optimized Query則不同, 只讀取base檔案,不讀取log檔案 ,因此讀取效率和COW表相同,但讀到的資料可能不是最新的。
官方對兩種query型別的解釋
以上講完了Hudi和upsert相關的主要功能,接下來講講Hudi另一大特色功能: Transactional ,也就是 事務功能 。
Hudi的事務功能被稱為Timeline,因為Hudi把所有對一張表的操作都儲存在一個時間線物件裡面。Hudi官方文件中對於Timeline功能的介紹稍微有點複雜,不是很清晰。其實從使用者角度來看的話,Hudi提供的事務相關能力主要是這些:
特性 | |
---|---|
原子性 | 寫入即使失敗,也不會造成資料損壞 |
隔離性 | 讀寫分離,寫入不影響讀取,不會讀到寫入中途的資料 |
回滾 | 可以回滾變更,把資料恢復到舊版本 |
時間旅行 | 可以讀取舊版本的資料(但太老的版本會被清理掉) |
存檔 | 可以長期儲存舊版本資料(存檔的版本不會被自動清理) |
增量讀取 | 可以讀取任意兩個版本之間的差分資料 |
講完了功能清單,接下來就講一講事務的實現原理。內容以 COW表為主,但MOR表也可以由此類推,因為MOR表本質上是對COW表的優化。
這裡沿用上一篇文章中的例子,假設初始我們有5條資料,內容如下
txn_id | user_id | item_id | amount | date |
---|---|---|---|---|
1 | 1 | 1 | 2 | 20220101 |
2 | 2 | 1 | 1 | 20220101 |
3 | 1 | 2 | 3 | 20220101 |
4 | 1 | 3 | 1 | 20220102 |
5 | 2 | 3 | 2 | 20220102 |
實際儲存的目錄結構是這樣的(檔名做了簡化)
warehouse
├── .hoodie
├── 20220101
│ ├── fileId1_001.parquet
│ └── fileId1_002.parquet
├── 20220102
│ └── fileId2_001.parquet
└── 20220103
└── fileId3_001.parquet
它 的資料儲存在fileId1_001和fileId2_001兩個檔案裡。
我們稱呼這個版本為v1。 接下來我們寫入3條新的資料,其中1條是更新,2條是新增。
txn_id | user_id | item_id | amount | date |
---|---|---|---|---|
3 | 1 | 2 | 5 | 20220101 |
6 | 1 | 4 | 1 | 20220103 |
7 | 2 | 3 | 2 | 20220103 |
寫入後的目錄結構如下
warehouse
├── .hoodie
├── 20220101
│ ├── fileId1_001.parquet
│ └── fileId1_002.parquet
├── 20220102
│ └── fileId2_001.parquet
└── 20220103
└── fileId3_001.parquet
更新的1條資料(txn_id=3)儲存在fileId1_002這個檔案裡,而新增的2條資料(txn_id=6和txn_id=7)則被儲存在fileId3_001。
我們稱呼更新後的版本為v2。
Hudi在這張表的timeline裡(實際存放在.hoodie目錄下)會記錄下v1和v2對應的檔案列表。 當client讀取資料時,首先會檢視timeline裡最新的commit是哪個,從最新的commit裡獲得對應的檔案列表,再去這些檔案讀取真正的資料。
v1和v2對應的檔案
Hudi通過這種方式實現了多版本隔離的能力。當一個client正在讀取v1的資料時,另一個client可以同時寫入新的資料,新的資料會被寫入新的檔案裡,不影響v1用到的資料檔案。只有當資料全部寫完以後,v2才會被commit到timeline裡面。後續的client再讀取時,讀到的就是v2的資料。
順帶一提的是,儘管Hudi具備多版本資料管理的能力,但舊版本的資料不會無限制地保留下去。Hudi會在新的commit完成時開始清理舊的資料,預設的策略是“ 清理早於10個commit前的資料 ”。
最後再講講Hudi的另一個特色功能: Incremental Query( 增量查詢 )。 這個功能提供給使用者“ 讀取任意兩個commit之間差分資料 ” 的能力。 這個功能也是基於上述的“多版本資料管理”實現的,下面就來講講。
還是以上文的例子,假設我們想要讀取v1 → v2之間的差分資料
Hudi會計算出v2到v1之間的差異是兩個檔案:fileId01_002和fileId03_001,然後client從這兩個檔案中讀到的就是增量資料。
有些朋友或許會發現,fileId01_002裡面包含了兩條老資料txn_id=1和txn_id=2,不屬於v2到v1的差分資料,不應該被讀取。確實如此。其實Hudi對每一條資料,都有 一個隱藏欄位_hoodie_commit_time用於記錄commit時間 ,這個欄位會和其他資料欄位一起儲存在parquet檔案裡。Hudi在讀取parquet檔案時,會同時用這個欄位對結果進行過濾,把不屬於時間範圍內的記錄都過濾掉。
關於Hudi原理的講解 , 寫到這裡就 差不多告一段落了 。 接下來可能會繼續介紹”資料湖三劍客“的其他兩個——Iceberg和Delta,以及它們之間的對比。 如果朋友們對接下來想看到什麼內容有好的建議,歡迎在公眾號後臺留言。
- Hudi 實踐 | 華為雲 MRS 基於 Apache Hudi 極致查詢優化的探索實踐
- 數倉還是資料湖?亦或是湖倉一體?該如何選型?
- Flink 實踐 | B站流式傳輸架構的前世今生
- Spark 實踐 | 蘋果工程師分享如何運維超大規模 Spark on Kubernetes 叢集
- Dynamo 解讀 | 面對 Dynamo 我們又將會學到怎樣的產品與技術設計哲學!
- Doris 解析 | Apache Doris 極速1.0版本解析與未來規劃
- Kyuubi 應用 | Kyuubi Playground 站點上線啦
- Kyuubi 實踐 | 如何優化 Spark 小檔案,Kyuubi 一步搞定!
- HBase 原理 | 淺析 Region split 引入 HFileLink 優化子 Region 操作實現原理
- Flink 解析 | Flink 原始碼:廣播流狀態原始碼解析
- Hudi 方案 | 網際網路場景下基於拉鍊表的全量表極限儲存優化方案
- Presto 實踐 | Presto 使用一致性雜湊改善動態叢集的快取命中率
- Presto 實踐 | Presto 在B站的實踐
- Iceberg 剖析 | Iceberg 是如何提高查詢效能的
- HDFS 實踐 | HDFS 在B站的探索和實踐
- Flink 實踐 | Apache Flink 在米哈遊的落地實踐
- Hudi 實踐 | 如何基於 CDH6 環境編譯 Hudi-0.9.0
- Hudi 原理 | 聊一聊 Apache Hudi 的原理(2)
- 資料湖 | 如何打造一款極速資料湖分析引擎
- Iceberg 實踐 | 胡爭:如何基於 Flink 和 Iceberg 構建雲原生資料湖?