位元組跳動資料湖技術選型的思考與落地實踐

語言: CN / TW / HK

本文是位元組跳動資料平臺開發套件團隊在 Flink Forward Asia 2021: Flink Forward 峰會上的演講,著重分享了位元組跳動資料湖技術上的選型思考和探索實踐。

文 | Gary Li 位元組跳動資料平臺開發套件團隊高階研發工程師,資料湖開源專案 Apache Hudi PMC Member

隨著 Flink 社群的不斷髮展,越來越多的公司將 Flink 作為首選的大資料計算引擎。位元組跳動也在持續探索 Flink,作為眾多 Flink 使用者中的一員,對於 Flink 的投入也是逐年增加。

位元組跳動資料整合的現狀

在 2018 年,我們基於 Flink 構造了異構資料來源之間批式同步通道,主要用於將線上資料庫匯入到離線數倉,和不同資料來源之間的批式傳輸。

在 2020 年,我們基於 Flink 構造了 MQ-Hive 的實時資料整合通道,主要用於將訊息佇列中的資料實時寫入到 Hive 和 HDFS,在計算引擎上做到了流批統一。

到了 2021 年,我們基於 Flink 構造了實時資料湖整合通道,從而完成了湖倉一體的資料整合系統的構建。

位元組跳動資料整合系統目前支援了幾十條不同的資料傳輸管道,涵蓋了線上資料庫,例如 Mysql Oracle 和 MangoDB;訊息佇列,例如 Kafka RocketMQ;大資料生態系統的各種元件,例如 HDFS、HIVE 和 ClickHouse。

在位元組跳動內部,資料整合系統服務了幾乎所有的業務線,包括抖音、今日頭條等大家耳熟能詳的應用。

整個系統主要分成 3 種模式——批式整合、流式整合和增量整合。

  • 批式整合模式基於 Flink Batch 模式打造,將資料以批的形式在不同系統中傳輸,目前支援了 20 多種不同資料來源型別。
  • 流式整合模式主要是從 MQ 將資料匯入到 Hive 和 HDFS,任務的穩定性和實時性都受到了使用者廣泛的認可。
  • 增量模式即 CDC 模式,用於支援通過資料庫變更日誌 Binlog,將資料變更同步到外部元件的資料庫。

這種模式目前支援 5 種資料來源,雖然資料來源不多,但是任務數量非常龐大,其中包含了很多核心鏈路,例如各個業務線的計費、結算等,對資料準確性要求非常高。

在 CDC 鏈路的整體鏈路比較長。首先,首次匯入為批式匯入,我們通過 Flink Batch 模式直連 Mysql 庫拉取全量資料寫入到 Hive,增量 Binlog 資料通過流式任務匯入到 HDFS。

由於 Hive 不支援更新操作,我們依舊使用了一條基於 Spark 的批處理鏈路,通過 T-1 增量合併的方式,將前一天的 Hive 表和新增的 Binlog 進行合併從而產出當天的 Hive 表。

隨著業務的快速發展,這條鏈路暴露出來的問題也越來越多。

首先,這條基於 Spark 的離線鏈路資源消耗嚴重,每次產出新資料都會涉及到一次全量資料 Shuffle 以及一份全量資料落盤,中間所消耗的儲存以及計算資源都比較嚴重。

同時,隨著位元組跳動業務的快速發展,近實時分析的需求也越來越多。

最後,整條鏈路流程太長,涉及到 Spark 和 Flink 兩個計算引擎,以及 3 個不同的任務型別,使用者使用成本和學習成本都比較高,並且帶來了不小的運維成本。

為了解決這些問題,我們希望對增量模式做一次徹底的架構升級,將增量模式合併到流式整合中,從而可以擺脫對 Spark 的依賴,在計算引擎層面做到統一。

改造完成後,基於 Flink 的資料整合引擎就能同時支援批式、流式和增量模式,幾乎可以覆蓋所有的資料整合場景。

同時,在增量模式上,提供和流式通道相當的資料延遲,賦予使用者近實時分析能力。在達到這些目標的同時,還可以進一步降低計算成本、提高效率。

 

經過一番探索,我們關注到了正在興起的資料湖技術。

關於資料湖技術選型的思考

我們的目光集中在了 Apache 軟體基金會旗下的兩款開源資料湖框架 Iceberg 和 Hudi 中。

Iceberg 和 Hudi 兩款資料湖框架都非常優秀。但兩個專案被建立的目的是為了解決不同的問題,所以在功能上的側重點也有所不同:

  • Iceberg:核心抽象對接新的計算引擎的成本比較低,並且提供先進的查詢優化功能和完全的 schema 變更。
  • Hudi:更注重於高效率的 Upsert 和近實時更新,提供了 Merge On Read 檔案格式,以及便於搭建增量 ETL 管道的增量查詢功能。

一番對比下來,兩個框架各有千秋,並且離我們想象中的資料湖最終形態都有一定距離,於是我們的核心問題便集中在了以下兩個問題:

哪個框架可以更好的支援我們 CDC 資料處理的核心訴求?

哪個框架可以更快速補齊另一個框架的功能,從而成長為一個通用並且成熟的資料湖框架?

經過多次的內部討論,我們認為:Hudi 在處理 CDC 資料上更為成熟,並且社群迭代速度非常快,特別是最近一年補齊了很多重要的功能,與 Flink 的整合也愈發成熟,最終我們選擇了 Hudi 作為我們的資料湖底座。

01 - 索引系統

我們選擇 Hudi,最為看重的就是 Hudi 的索引系統。

這張圖是一個有索引和沒有索引的對比。

在 CDC 資料寫入的過程中,為了讓新增的 Update 資料作用在底表上,我們需要明確知道這條資料是否出現過、出現在哪裡,從而把資料寫到正確的地方。在合併的時候,我們就可以只合並單個檔案,而不需要去管全域性資料。

如果沒有索引,合併的操作只能通過合併全域性資料,帶來的就是全域性的 shuffle。

在圖中的例子中,沒有索引的合併開銷是有索引的兩倍,並且如果隨著底表資料量的增大,這個效能差距會呈指數型上升。

所以,在位元組跳動的業務資料量級下,索引帶來的效能收益是非常巨大的。

Hudi 提供了多種索引來適配不同的場景,每種索引都有不同的優缺點,索引的選擇需要根據具體的資料分佈來進行取捨,從而達到寫入和查詢的最優解。

下面舉兩個不同場景的例子。

1、日誌資料去重場景

在日誌資料去重的場景中,資料通常會有一個 create_time 的時間戳,底表的分佈也是按照這個時間戳進行分割槽,最近幾小時或者幾天的資料會有比較頻繁的更新,但是更老的資料則不會有太多的變化。

冷熱分割槽的場景就比較適合布隆索引、帶 TTL 的 State 索引和雜湊索引。

2、CDC 場景

第二個例子是一個數據庫匯出的例子,也就是 CDC 場景。這個場景更新資料會隨機分佈,沒有什麼規律可言,並且底表的資料量會比較大,新增的資料量通常相比底表會比較小。

在這種場景下,我們可以選用雜湊索引、State 索引和 Hbase 索引來做到高效率的全域性索引。

這兩個例子說明了不同場景下,索引的選擇也會決定了整個表讀寫效能。Hudi 提供多種開箱即用的索引,已經覆蓋了絕大部分場景,使用者使用成本非常低。

02 - Merge On Read 表格式

除了索引系統之外,Hudi 的 Merge On Read 表格式也是一個我們看重的核心功能之一。這種表格式讓實時寫入、近實時查詢成為了可能。

在大資料體系的建設中,寫入引擎和查詢引擎存在著天然的衝突:

寫入引擎更傾向於寫小檔案,以行存的資料格式寫入,儘可能避免在寫入過程中有過多的計算包袱,最好是來一條寫一條。

查詢引擎則更傾向於讀大檔案,以列存的檔案格式儲存資料,比如說 parquet 和 orc,資料以某種規則嚴格分佈,比如根據某個常用欄位進行排序,從而做到可以在查詢的時候,跳過掃描無用的資料,來減少計算開銷。

為了在這種天然的衝突下找到最佳的取捨,Hudi 支援了 Merge On Read 的檔案格式。

MOR 格式中包含兩種檔案:一種是基於行存 Avro 格式的 log 檔案,一種是基於列存格式的 base 檔案,包括 Parquet 或者 ORC。

log 檔案通常體積較小,包含了新增的更新資料。base 檔案體積較大,包含了所有的歷史資料。

寫入引擎可以低延遲的將更新的資料寫入到 log 檔案中。

查詢引擎在讀的時候將 log 檔案與 base 檔案進行合併,從而可以讀到最新的檢視;compaction 任務定時觸發合併 base 檔案和 log 檔案,避免 log 檔案持續膨脹。在這個機制下,Merge On Read 檔案格式做到了實時寫入和近實時查詢。

03 - 增量計算

索引系統和 Merge On Read 格式給實時資料湖打下了非常堅實的基礎,增量計算則是這個基礎之上的 Hudi 的又一個亮眼功能:

增量計算賦予了 Hudi 類似於訊息佇列的能力。使用者可以通過類似於 offset 的時間戳,在 Hudi 的時間線上拉取一段時間內的新增資料。

在一些資料延遲容忍度在分鐘級別的場景中,基於 Hudi 可以統一 Lambda 架構,同時服務於實時場景和離線場景,在儲存上做到流批一體。

位元組跳動內部場景實踐思考

在選擇了基於 Hudi 的資料湖框架後,我們基於位元組跳動內部的場景,打造定製化落地方案。我們的目標是通過 Hudi 來支援所有帶 Update 的資料鏈路:

需要高效率且低成本的 Upsert

支援高吞吐

端到端的資料可見性控制在 5-10 分鐘以內

目標明確後,我們開始了對 Hudi Flink Writer 進行了測試。這個圖是 Hudi on Flink Writer 的架構:一條新的資料進來之後,首先會經過一個索引層,從而找到它需要去的地方。

  • State 索引中儲存了所有主鍵和檔案 ID 的一一對映關係,對於 Update 資料,會找到其所存在的檔案 ID,對於 Insert 資料,索引層會給他指定一個新的檔案 ID,或者是歷史檔案中的小檔案,讓其填充到小檔案中,從而避免小檔案問題。
  • 經過索引層之後,每條資料都會帶有一個檔案 ID,Flink 會根據檔案 ID 進行一次 shuffle,將相同檔案 ID 的資料匯入到同一個子任務中,同時可以避免多個任務寫入同一個檔案的問題。
  • 寫入子任務中有一個記憶體緩衝區,用於儲存當前批次的所有資料,當 Checkpoint 觸發時,子任務緩衝區的資料會被傳入 Hudi Client 中,Client 會去執行一些微批模式的計算操作,比如 Insert/Upsert/Insert overwrite 等,每種操作的計算邏輯不同,比如說 Insert 操作,會生成一個新的檔案,Upsert 操作可能會和歷史檔案做一次合併。
  • 待計算完成後,將處理好的資料寫入到 HDFS 中,並同時收集元資料。
  • Compaction 任務為流任務的一部分,會定時的去輪訓 Hudi 的時間線,檢視是否有 Compaction 計劃存在,如果有 Compaction 計劃,會通過額外的 Compaction 運算元來執行。

在測試過程中,我們遇到了以下幾個問題:

  • 在資料量比較大的場景下,所有的主鍵和檔案 ID 的對映關係都會存在 State 中,State 的體積膨脹的非常快,帶來了額外的儲存開銷,並且有時會造成 Checkpoint 超時的問題。
  • 第二個問題是,由於 Checkpoint 期間,Hudi Client 操作比較重,比如說和底層的 base 檔案進行合併,這種操作涉及到了歷史檔案的讀取,去重,以及寫入新的檔案,如果遇到 HDFS 的抖動,很容易出現 Checkpoint 超時的問題
  • 第三個問題是,Compaction 任務作為流式任務的一部分,任務啟動後資源就不可調節,如果需要調節,只能重啟整個任務,開銷比較大,如果不能靈活調節 Compaction 任務,就可能會出現 Compaction 運算元空跑導致資源浪費,或者資源不足導致任務失敗的情況

為了解決這些問題,我們開始針對我們的場景進行了定製化的優化

位元組跳動的定製化優化技術方案

01 - 索引層

索引的目的就是找到當前這條資料所在的檔案地點,存在 State 中的話每條資料都涉及到一次 State 的讀和寫,在資料量大的場景下,所帶來的計算和儲存開銷都是比較大的。

位元組跳動內部開發了一種基於雜湊的索引,可以通過直接對主鍵的雜湊操作來找到檔案所在的位置,這種方式在非分割槽表下可以做到全域性索引,繞過了對 State 的依賴,改造過後,索引層變成了一層簡單的雜湊操作。

02 - 寫入層

早期的 Hudi 寫入和 Spark 強繫結,在 2020 年底,Hudi 社群對底層的 Hudi Client 進行了拆分,並且支援了 Flink 引擎,這種改造方式是將 Spark RDD 的操作變成了一個 List 的操作,所以底層還是一個批式操作,對於 Flink 來說,每一次 Checkpoint 期間所需要做的計算邏輯是類似於 Spark RDD 的,相當於是執行了一次批式的操作,計算包袱是比較大的。

寫入層的具體流程是:一條資料經過索引層後,來到了寫入層,資料首先會在 Flink 的記憶體緩衝區積攢,同時通過記憶體監控來避免記憶體超出限制導致任務失敗,到了 Checkpoint 的時候,資料會被匯入到 Hudi Client,然後 Hudi Client 會通過 Insert,Append,Merge 等操作計算最終的寫入資料,計算完成後將新的檔案寫入到 HDFS 並同時回收元資料。

我們的核心目標在於如何讓這種微批的寫入模式更加的流式化,從而降低 Checkpoint 期間的計算負擔。

在表結構上,我們選擇了與流式寫入更加匹配的 Merge on Read 格式,寫入的運算元只負責對於 log 檔案的追加寫入,不做任何別的額外的操作,例如和 base 檔案進行合併。

在記憶體上,我們將第一層 Flink 的記憶體緩衝區去掉,直接把記憶體緩衝區建立在了 hudi client 中,在資料寫入的同時進行記憶體監控避免記憶體超出限制的情況,我們將寫入 hdfs 的操作和 Checkpoint 進行了解耦,任務執行過程中,每一小批資料就會寫入 HDFS 一次,由於 HDFS 支援追加寫操作,這種形式也不會帶來小檔案的問題,從而將 Checkpoint 儘可能的輕量化,避免 HDFS 抖動和計算量過大帶來的 Checkpoint 超時的問題。

03- Compaction 層

Compaction 任務本質上是一個批任務,所以需要和流式寫入進行拆分,目前 Hudi on Flink 支援了非同步執行 Compaction 的操作,我們的線上任務全部使用了這種模式。

在這種模式下,流式任務可以專注於寫入,提升吞吐能力和提高寫入的穩定性,批式的 Compaction 任務可以流式任務解耦,彈性伸縮高效的利用計算資源,專注於資源利用率和節約成本。

在這一系列的優化過後,我們在一個 2 百萬 rps 的 Kafka 資料來源上進行了測試,使用了 200 個併發匯入到 Hudi。和之前相比,Checkpoint 耗時從 3-5 分鐘降低到了 1 分鐘以內,HDFS 抖動帶來的任務失敗率也大幅度下降由於 Checkpoint 耗時降低,實際用於資料處理的時間變得更多了,資料吞吐量翻了一倍,同時 State 的儲存開銷也降到了最低。

這是最終的 CDC 資料匯入流程圖

首先,不同的資料庫會將 Binlog 傳送到訊息佇列中,Flink 任務會將所有資料轉換成 HoodieRecord 格式,然後通過雜湊索引找到對應的檔案 ID,通過一層對檔案 ID 的 shuffle 後,資料到達了寫入層,寫入運算元以追加寫的形式將資料頻繁的寫入到 HDFS 中,Checkpoint 觸發後,Flink 會將所有的元資料收集到一起,並寫入到 hudi 的元資料系統中,這裡就標誌了一個 Commit 提交完成,一個新的 Commit 會隨之開始。

使用者可以通過 Flink Spark Presto 等查詢引擎,近實時的查詢已經提交完成的資料。

資料湖平臺側託管的 Compaction 服務會定時提交 Flink Batch 模式的 Compaction 任務,對 Hudi 表進行壓縮操作,這個過程對使用者無感知並且不影響寫入任務。

我們這一整套解決方案也會貢獻給社群,感興趣的同學可以關注 Hudi 社群最新的進展

流式資料湖整合框架的典型落地場景

流式資料湖整合框架改造完成後,我們找到了一些典型的落地場景:

應用最普遍的就是將線上資料庫匯入到離線數倉進行分析的場景,和之前的 Spark 離線鏈路相比:端到端的資料延遲從一個小時以上降低到了 5-10 分鐘,使用者可以進行近實時的資料分析操作。

在資源利用率方面,我們模擬了一個 Mysql 匯入離線數倉進行分析的場景,將 Flink 流式匯入 Hudi 和 Spark 離線合併的方案進行了對比,在使用者小時級查詢的場景下,端到端的計算資源大約節約了 70%左右。

在位元組跳動 EB 級資料量的數倉場景下,這種資源利用率的提升所帶來的收益是非常巨大的。

對於基於訊息佇列和 Flink 構建實時數倉的使用者來說,他們可以把不同數倉層級的實時資料匯入到 Hudi,這類資料 update 的情況很多,所以相較於 Hive,Hudi 可以提供高效率且低成本的 Upsert 操作,從而使用者可以對於全量資料進行近實時查詢,避免了一次去重的操作。

這是一個 Flink 雙流 Join 的場景,很多 Flink 的使用者會使用雙流 Join 來進行實時的欄位拼接,在使用這個功能的時候,使用者通常會開一個時間視窗,然後將這個時間視窗中來自不同資料來源的資料拼接起來,這個欄位拼接功能也可以在 Hudi 的層面實現。

我們正在探索一個功能,在 Flink 中只將不同 Topic 的資料 Union 在一起,然後通過 Hudi 的索引機制,將相同主鍵的資料都寫入到同一個檔案當中,然後通過 Compaction 的操作,將資料進行拼接。

這種方式的優點在於,我們可以通過 Hudi 的索引機制來進行全域性欄位拼接,不會受到一個視窗的限制。

整個拼接邏輯通過 HoodiePayload 實現,使用者可以簡單的繼承 HoodiePayload,然後來開發自己的自定義的拼接邏輯,拼接的時機可以是 Compaction 任務,也可以是 Merge on Read 近實時查詢,使用者可以根據需求場景,靈活的使用計算資源。但是相比 Flink 雙流 Join,這種模式會有一個缺點,就是實時性和易用性上要差一些。

結語

在這一系列的工作過後,我們對資料湖的未來滿懷期待,同時也設立的明確的目標。

首先,我們希望將 Hudi 作為所有 CDC 資料來源的底層儲存,完全替換掉基於 Spark 的離線合併方案,通過資料整合引擎流式匯入,將近實時離線分析的能力帶給所有的線上資料庫。

接著,增量 ETL 場景也是一個重要的落地場景,對於資料延遲容忍度在分鐘級的場景,Hudi 可以作為統一儲存同時服務於實時鏈路和離線鏈路,從而將傳統的數倉 Lambda 架構升級到真正意義上的流批一體。

最後,我們希望建設一個智慧資料湖平臺,這個平臺會託管所有資料湖的運維管理,達到自我治理的一個狀態,使用者則不需要再為運維而煩惱。

同時,我們希望提供自動化調優的功能,基於資料的分佈找到最佳的配置引數,例如之前提到的不同索引之間的效能取捨問題,我們希望通過演算法來找到最佳的配置,從而提高資源利用率,並降低使用者的使用門檻。

極佳的使用者體驗也是我們的追求之一,我們希望在平臺側做到一鍵入湖入倉,大大降低使用者的開發成本。

資料湖整合技術也已經通過火山引擎大資料研發治理套件DataLeap對外開放。

歡迎關注位元組跳動資料平臺同名公眾號