Hudi Bucket Index 在位元組跳動的設計與實踐

語言: CN / TW / HK

由位元組跳動資料湖團隊貢獻的 RFC-29 Bucket Index 在近期合入 Hudi 主分支,本文詳細介紹 Hudi Bucket Index 產生的背景與實踐經驗。

文 | 位元組跳動資料平臺數據湖團隊

Hudi 是一個流式資料湖平臺,提供 ACID 功能,支援實時消費增量資料、離線批量更新資料,並且可以通過 Spark、Flink、Presto 等計算引擎進行寫入和查詢。Hudi 官方對於檔案管理和索引概念的介紹如下:

Hudi提供類似 Hive 的分割槽組織方式,與 Hive 不同的是,Hudi 分割槽由多個 File Group 構成,每個 File Group 由 File ID進行標識。File Group 內的檔案分為 Base File ( parquet 格式) 和 Delta File( log 檔案),Delta File 記錄對 Base File 的修改。Hudi 使用了 MVCC 的設計,可以通過 Compaction 任務把 Delta File 和 Base File 合併成新的 Base File,並通過 Clean 操作刪除不需要的舊檔案。 Hudi 通過索引機制將給定的 Hudi 記錄一致地對映到 File ID,從而提供高效的 Upsert。Record Key和 File Group/File ID 之間的這種對映關係,一旦在 Record 的第一個版本確定後,就永遠不會改變。簡而言之,包含一組記錄的所有版本必然在同一個 File Group 中。 在本文中,我們將重點介紹 Hudi 索引機制相關的作用和原理,以及優化實踐。

Hudi索引的作用與型別

索引的作用

在傳統 Hive 數倉的場景下,如果需要對一個分割槽資料做更新,整個更新過程會涉及三個很重的操作。舉一個更直觀的例子。假設一個 Hive 分割槽存在 100,000 條記錄,分佈在 400 個檔案中,我們需要更新其中的 100 條資料。這三個很重的操作分別是:

  1. 從 400 個檔案中讀出 100,000 條資料
  2. 與 100 條更新的資料做分散式關聯,取最新值
  3. 將更新後的 100,000 條資料寫入臨時目錄,最後覆蓋原先的資料

由此可以引出三個問題:

  1. 讀那麼多檔案是必要的嗎?
  2. 更新那麼多檔案是必要的嗎?
  3. 分散式關聯是必要的嗎? 假設在資料分佈最糟糕的情況下,需要被更新的 100 條資料分佈在 100 個檔案中。那我們實際需要讀和更新的檔案是多少個? 答案是 100 個,只佔總量的 1/4。 因此,Hudi 為了消除不必要的讀寫,引入了索引的實現。在有了索引之後,更新的資料可以快速被定位到對應的 File Group,以下面的官方的示意圖為例,
  • 避免讀取不需要的檔案
  • 避免更新不必要的檔案 無需將更新資料與歷史資料做分散式關聯,只需要在 File Group 內做合併

索引的型別

索引是獨立模組, 開源 Hudi 主要提供以下兩種索引: 在本文中,我們將介紹一個新的 Hudi 索引模組 Bucket Index 在位元組跳動的設計與實踐。

Bucket Index產生背景

索引帶來的效能收益是非常巨大的, 儘管 Hudi 已支援 Bloom Filter Index、Hbase index型別,但在位元組跳動大規模資料入湖、探索分析等場景中,我們仍然碰到了現有索引型別無法解決的挑戰,因此在實踐中我們開發了 Bucket Index 的索引方式。

業務場景挑戰

位元組跳動某業務部門需要利用實時資料計算各種指標。在其業務場景中存在定期批量寫入和流式寫入場景,整個流程可以描述如下:

  1. 批量場景會先將 binlog 匯入儲存到 Hive 離線倉庫中,再按照小時/天級粒度更新資料湖。
  2. 實時場景則通過 Flink 消費更新的 kafka 資料,寫入資料湖,供下游業務使用。
  3. 當源頭資料中的記錄存在主鍵重複的情況下,需要保留最新一條資料即可。
  4. 在分析側,業務會基於 Hudi 資料集,通過 Presto/Spark 查詢引擎,構建視覺化的 BI 報表看板,供運營或分析師自助進行近實時資料分析。 隨著入湖的資料量增加,Hudi 中生成了約 40,000 個 File Group。雖然該業務部門使用了 Hudi 索引避免了全域性合併操作,但是隨著 File Group 的數量以及儲存的資料量增加,定位 File Group 的時間也在增加,這造成了 Upsert 速度逐漸緩慢的情況,這嚴重影響了任務產出時間,甚至導致任務無法跑下去。

分析與對策

為了解決 Upsert 資料場景逐步緩慢的情況,位元組跳動資料湖團隊對整體的效能下降原因做了進一步分析,並針對性地提出瞭解決方案。

  • 原先的業務場景使用了預設的 Bloom Filter Index 的索引方式。在觀察中,團隊發現最終在資料量約 30TB 的場景下,定位 Record 的效能會非常糟糕,此時一共產生了約 5 千億條記錄分佈在40,000 個 File Group 中。
  • 在 5 千億條記錄的資料規模下,團隊發現定位緩慢的問題來自 Bloom Filter Index 的假陽性。當 Bloom Filter 發生假陽性時, Hudi 需要確定該 Record Key 是否真的存在。這個操作需要讀取檔案裡的實際資料一條一條做對比,而實際資料量規模很大,這會導致查詢 Record Key 跟 File ID 的對映關係代價非常大,因此造成了索引的效能下滑。
  • 團隊也調研了 Hudi 的另外一種索引方式 Hbase Index。這是一種 HBase 外接儲存系統索引。但由於業務方不希望引入 HBase 這一額外依賴,且擔心運維 Hbase 過程中存在新的問題,認為 Hbase Index 整體不夠輕量,因此在整個業務場景中也無法作為 Bloom Filter 索引的替代。

在這樣的場景下,位元組跳動需要一個更加輕量且高效的索引方式,並且能夠避免在大資料場景下的插入效能問題。

在不斷實踐中,位元組跳動資料湖團隊在邏輯層開發了一種基於雜湊的索引,使得在插入過程中,定位傳入 Record 的待寫入檔案位置資訊時,無需讀歷史的 Record ,並貢獻到了社群的 RFC-29

改造過後,索引層變成了一層簡單的雜湊操作,可以直接通過對索引鍵的雜湊操作來找到檔案所在的位置。

Bucket Index 設計原理

Bucket Index 是一種基於雜湊的索引,借鑑了資料庫裡的 Hash Index。給定 n 個桶, 用 Hash 函式決定某個記錄屬於哪個桶。最終所有分割槽被分成 N 個桶,每個桶對應一個 File Group。

相比較 Bloom Filter Index 來說,Hash Index 在邏輯層面提供了 Record Key 跟 File Group 的對映關係, 不存在假陽性問題。相同 key 的資料一定是落在同一個桶裡面。最終一分割槽內的結構如下,目前一個 Partition 裡面 Bucket 和 File Group 是一一對應的關係。

Bucket Index 資料寫入原理

Bucket Index 的實際寫入流程可以參考下面的過程示意圖。以下面的實時插入場景為例,某業務批次新增了 5 條記錄,並且需要 Upsert 到已有的分割槽 partition=20220203 中,對已有資料根據主鍵 Record 做一個更新,保留最新的資料。 整個過程可以用下面的示意圖表示:

  1. 在建表時先預估表的單個分割槽資料儲存大小,設定一個分桶數 numBuckets。
  2. 在資料插入前,首先生成 n 個 File ID, 將 File ID 的前8位替換成 bucketId 的數字:

00000000-e929-4327-8b0c-7d0d66091321 00000001-e3cd-4756-b311-863803a6cdaf 00000002-c4ed-4418-90d4-6e348f380636 00000003-c7bd-4916-78c5-6g787g090636

  1. 在插入過程中,最重要的一步就是標記每條新插入的記錄屬於哪個檔案 File Group,然後找到對應的 File Group 去更新或者合併。在目前的設計中, 分桶數跟 File Group 是一一對應的對映關係,因此找到每條Record 對應的桶 ID ,即可確定 Record Key跟 File Group的對映關係。 在具體實現中,我們會對更新資料的索引鍵計算雜湊,再對分桶數取模快速定位到每個 Record 對應的桶,整個過程如下面的 Hash 函式所示:
hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets

其中hashKeyFields可以由使用者指定,是Record Key的一個子集,當預設不指定時,會以Record Key本身作為 hash 鍵。在計算好後,每條記錄即可知道即將寫入的桶。 4. 完成資料寫入 經過索引層之後,每條資料都會帶有一個 File ID,引擎會根據 File ID 進行一次 Shuffle,將相同File ID的資料匯入到同一個子任務中。對於 COW 表而言,更新Update 部分需要和已有的 BaseFile 合併生成新的 BaseFile。而 MOR 表將 Update 的資料直接寫入對應 File Group 的 delta log,Insert 部分生成新的 BaseFile,最終完成該批次資料的 Upsert。 由此可見,整個過程中 Bucket Index 不需要對現有的資料進行掃描組成類似 Bloom Filter 一樣的過濾器,因此可以省去整個定位 File Group 的查詢時間,定位 File Group 的時間也不會隨著已有 Record 條數的增加而導致效能下降。同時分桶操作會在每個桶內對分桶列排序,排序後的資料一般能獲得更高的壓縮率,也能節省儲存。

Bucket Index 查詢優化原理

在查詢時,Bucket Index 的查詢優化會充分利用主流計算引擎的特性。例如 Spark 會利用表的 Bucket 分佈做查詢優化,例如提升查詢效能。從 Bucket Index 表中讀取資料時,由於資料分佈已經按照按索引欄位進行聚類和排序。Spark 可以通過在優化器中應用規則來匹配這種模式,來避免一些 Shuffle 操作。 目前的優化規則主要有下面兩種:

  • Bucket Pruning,利用表的 Bucket 分佈對讀取資料進行剪枝。 例如,如下的T1表的 bucket column 為 city ,在執行下面查詢時: select * from T1 where city = beijing 在針對索引列 city 的某個值進行查詢時,實際上只需讀取一個分桶資料 ( bucket pruning ) , 因為city= beijing 的 Record 在一個分割槽中必然是 Hash 到同一個 Bucket,這樣對於每個分割槽來說,被 Scan 讀取的 Hudi 資料量會大大減少。

  • Bucket Join,利用表的 Bucket 分佈減少 Aggregate/Join 帶來的 shuffle 操作。 對於 Group by 的場景,例如 city 是其中的一個索引列,在進行下面的聚合操作時: select city from T1 group by city 由於相同A的取值必然是落在同一個 bucket 桶中,因此尋找 city='beijing' 時,不需要去訪問其它的 bucket 中去獲得,因此可以在 window 操作時可以省去一次 Shuffle 操作。

同理在 Join 的過程中,假如 T1 是一張 bucket 表並且 bucket index 的索引鍵為 city。而 T2 是一張非 bucket 表。 在 join 時,對於開啟 bucket index 的表 T1 可以避免一次額外的 exchange 操作:

select count(*) from T1 join T2 where T1.city = T2.city

總體而言,所以利用 Bucket Index 的 Hudi 表可以做到提升過濾速度和提高查詢效率。

Bucket Index 的實踐與未來規劃

在實踐過程中,我們也發現了 Bucket Index 的一些實踐建議以及未來的方向。一個關鍵的問題,是如何確定 numBuckets 的值,目前 Bucket Index 的桶數量 ,需要根據預估的資料量提前在建表時進行確定,且建表後不可更改,對於這種限制,我們目前有下面的解決方案。

要設定合理的桶數量,需要預測表的目標大小和未來資料增長情況。

  • 桶的數量過小會降低整體引擎的並行速度,原因不難理解:當資料量增大時, 單個 File Group 對應的資料將增大,而 Hudi 表是以 File Group 為單位將資料切割生成 inputSplit 的,單個 File Group 資料過大將導致查詢併發降低,效能下降。 一般說來建議單個桶的大小控制在 3GB 左右。

  • 同時我們也應該避免桶的數量過多,過多的桶數量則會造成單個桶的資料量太小,造成小檔案情況。基於這樣的範圍,當目標表的大小可以被預測時,我們可以比較容易得到一個合適的 Bucket Index 的桶數量值。

當然,我們也意識到這樣的做法並不是一個靈活的方法。在未來,我們將推出可擴充套件的 Hash Index 桶方法來徹底解決這個問題。我們將支援已有的 Hudi 表在建表後直接擴充套件桶的數量,以避免當業務資料暴增時單個檔案太大,影響查詢以及 Compaction 效能。我們的後續優化將利用 Hashmap 的擴容過程,將分桶數按倍數做到輕量級擴容。當桶的數量在初期預測設定較小時,今後也能動態擴容,可以徹底解決預估桶數量不準確帶來的煩惱。

總結

總結而言,Hudi Bucket Index 作為一種基於雜湊的索引,充分做到了輕量級。對更新資料的主鍵計算雜湊,再對分桶數取模快速定位到 File Group,可以穩定的保證匯入效能。相比 Bloom Filter Index 而言,在大資料匯入 Upsert 場景下有一定的優勢,幫助位元組跳動的業務部門解決了匯入效能隨著資料量增長而下降的難題。 同時在查詢時,也能充分跟計算引擎結合,利用表的 Bucket 分佈對讀取資料進行剪枝,並且利用 Bucket 分佈特性減少 Aggregate/Join 帶來的 Shuffle 操作,提升了查詢效能。 對於 Hudi 使用使用者來說,也不需要改變原有的習慣,只需以插拔的方式指定 Hudi 表想使用的索引型別和桶的數量配置即可,充分做到了易用性與便捷。 目前 Hudi Bucket Index (RFC-29) 的實現已經合入社群最新的主分支,因此,我們非常推薦廣大 Hudi 社群使用者在實踐中使用,並且歡迎各位同行在 Hudi 社群進行技術交流與深入討論,後續我們也會基於 Bucket Index 的反饋持續貢獻新特性。

產品介紹

火山引擎湖倉一體分析服務LAS

湖倉一體分析服務 LAS(Lakehouse Analytics Service)是面向湖倉一體架構的Serverless資料處理分析服務,提供一站式的海量資料儲存計算和互動分析能力,完全相容 Spark、Presto、Flink 生態,幫助企業輕鬆完成資料價值洞察。地址

火山引擎 E-MapReduce

支援構建開源Hadoop生態的企業級大資料分析系統,完全相容開源,提供 Hadoop、Spark、Hive、Flink整合和管理,幫助使用者輕鬆完成企業大資料平臺的構建,降低運維門檻,快速形成大資料分析能力。地址

歡迎關注位元組跳動資料平臺同名公眾號