位元組跳動基於 Apache Hudi 構建實時資料湖平臺實踐

語言: CN / TW / HK

一篇關於位元組跳動基於 Apache Hudi 的實時資料平臺的分享。

本篇內容包含四個部分,首先介紹一下 Hudi,其次介紹位元組的實時資料湖平臺的應用場景;然後針對應用場景,位元組做的優化和新特性;最後介紹未來規劃。

Hudi 是一個流式資料湖平臺,提供 ACID 功能,支援實時消費增量資料、離線批量更新資料,並且可以通過 Spark、Flink、Presto 等計算引擎進行查詢。

Hudi 表由 timeline 和 file group兩大項構成。Timeline 由一個個 commit 構成,一次寫入過程對應時間線中的一個 commit,記錄本次寫入修改的檔案。

相較於傳統數倉,Hudi 要求每條記錄必須有唯一的主鍵,並且同分區內,相同主鍵只存在在一個 file group中。底層儲存由多個 file group 構成,有其特定的 file ID。File group 內的檔案分為 base file 和 log file, log file 記錄對 base file 的修改,通過 compaction 合併成新的 base file,多個版本的 base file 會同時存在。

Hudi 表分為 COW 和 MOR兩種型別,

COW 表適用於離線批量更新場景,對於更新資料,會先讀取舊的 base file,然後合併更新資料,生成新的 base file。 MOR 表適用於實時高頻更新場景,更新資料會直接寫入 log file 中,讀時再進行合併。為了減少讀放大的問題,會定期合併 log file 到 base file 中。

對於更新資料,Hudi 通過索引快速定位資料所屬的 file group。目前 Hudi 已支援 Bloom Filter Index、Hbase index 以及 Bucket Index。其中 Bucket Index 尚未合併到主分支。

位元組跳動基於 Hudi 的實時資料湖平臺,通過秒級資料可見支援實時數倉。除了提供 Hudi 社群的所有功能外,還支援下述第三部分介紹的特性。

一個典型的 pipeline 是MySQL 側的 binlog 生產到 Kafka。

實時場景直接通過 Spark Streaming 或 Flink 消費這部分更新資料,寫入資料湖,供下游業務使用。 批量場景會先將 binlog 通過 dump service 儲存到 HDFS上,再按照小時/天級粒度更新到資料湖中。

在位元組的推薦場景中,為服務離線對資料分析挖掘需求,需要將資料從類 Hbase的儲存匯出到離線儲存中,並且可以提供高效的 OLAP 訪問。因此我們基於資料湖構建BigTable 的 CDC。

此外,在特徵工程和模型訓練場景中,需要將推薦系統 Serving 時獲得的資料和端上埋點資料這兩類實時資料流通過主鍵合併到一起,作為機器學習樣本。因此我們希望可以藉助資料湖的能力,低成本的批量新增特徵列。

數倉 backfill 場景中,需要對歷史全量資料進行部分行、列的更新,在 Hive 模式下,需要將增量資料和歷史全量進行 join,重新生成全量資料。其中,部分表的存量資料到達百 PB 級別。我們通過資料湖極大的減少了計算資源消耗,提升了端到端的效能。

數倉場景中,對於一張底層分析表,往往是通過多個數據源的資料組合拼接而成,每個資料來源都包含相同的主鍵列,和其他不同的屬性列。在傳統數倉場景中,需要先將每個資料來源資料 dump 成 Hive 表,然後再將多張 Hive 表按主鍵 join 後生成最終的完整 schema 的大表,延遲可到達天級別。我們通過資料湖使實時成為可能,並且提供列拼接能力,使下游資料分析效能大幅提升。

接下來介紹第三部分,針對上述場景,位元組做的優化與新特性。

Hive Metastore 是元資料的事實標準,但是基於目錄的元資料管理方式太粗,沒有辦法滿足資料湖以 commit 的形式管理元資料的需求。我們提出了適用於資料湖場景下的元資料管理系統 Hudi Metastore,並基於此設計了湖倉統一的元資料管理系統。

整個架構分為三部分引擎層、元資料層、儲存層。元資料層對外提供統一的元資料檢視,與 HMS 完全相容,可無縫對接多個計算引擎。元資料層的 Catalog Service 接收來自引擎層的訪問請求,按規則路由到不同的 Metastore 上。元資料層通過 Catalog Service 遮蔽底層多 Metastore 的異構性。

Hudi Metastore 作為資料湖元資料管理系統,支援 commit 形式的元資料管理,基於樂觀鎖和 CAS 支援併發更新;持久化元資料的 Snapshot,通過快取常被訪問的元資料、索引資訊,提供高效查詢;提供分割槽裁剪功能。整體設計

底層儲存可插拔,不依賴某個特定的儲存系統,可以是 HDFS、KV、MySQL 輕量且易於擴充套件,服務無狀態,支援水平擴充套件;儲存可通過拆庫/表的方式縱向擴充套件 與 Hive Metastore 相容

我們基於 Hudi Metastore和樂觀鎖的假設,實現了併發寫入,並且支援靈活的行列衝突檢查策略。衝突檢查會在 instant 狀態變換的兩個節點進行,一個是 requested 轉 inflight 狀態,一個是 inflight 轉 completed 狀態。其中,後者狀態變換時,會進行加鎖操作,以實現版本隔離。

衝突檢查即是對 instant 建立到狀態變化的過程中其他已經完成/正在執行的 instant 之間的進行衝突檢查,檢查策略分為行列兩種,

行級別的衝突檢查即是不能同時有兩個 instant 往同一個 file group 寫。 列級別的衝突檢查即是可以有兩個 instant 往同一個 file group 寫,但是兩個 instant 寫入的schema 不可以存在交集。 每個 instant 只寫入 schema 中的部分列,log 檔案中的資料只包含 schema 中的部分 Compaction 按主鍵拼接不同列下的資料,Parquet 檔案中儲存的資料擁有完整的 schema

Hudi 目前的兩種索引方式,Bloom Filter Index 在大資料場景下,假陽性的問題會導致查詢效率變差,而 Hbase Index 會引入額外的外部系統,從而提升運維代價。因此,我們希望能有一個輕量且高效的索引方式。

Bucket Index 是一種基於雜湊的索引。每個分割槽被分成 N 個桶,每個桶對應一個 file group。對於更新資料,對更新資料的主鍵計算雜湊,再對分桶數取模快速定位到 file group,提升匯入實時性。

現有的計算引擎大都會利用表的 Bucket 分佈做查詢優化,提升查詢效能。優化規則包含兩種:

Bucket Pruning,利用表的 Bucket 分佈對讀取資料進行剪枝。 Bucket Join,利用表的 Bucket 分佈減少 Join/Aggregate 帶來的 shuffle 操作。

Hudi 要求每條資料都有唯一主鍵和比較列,用於資料更新時定位 file group 和新舊資料比較。資料定位 file group 過程需要先根據索引構建主鍵到 file group 的對映關係,然後與更新資料按照主鍵進行 join,從而找到每條更新資料對應的 file group。

對於日誌場景,無確定的主鍵,並且使用者查詢也僅僅是對某些列進行 count 操作,因此更新資料只需要直接追加到任一檔案末尾即可,也就是 Append 模式。為此,我們提出了 NonIndex方案,無需指定主鍵和比較列,更新過程也無需構建主鍵到 file group 的對映關係,避免了 join,提升了匯入的實時性。

後續我們會將新特性逐步貢獻到社群。

最後,位元組跳動資料引擎團隊持續招人中,團隊支撐位元組所有業務線的數倉,打造業界領先的 PB 級 OLAP引擎。工作地包括:北京/上海/杭州,有興趣的小夥伴歡迎新增微信 minihippo666,或傳送簡歷至郵件 [email protected] [1] ,或直接通過下述二維碼進行投遞,具體職位資訊可通過下述二維碼查詢。