位元組跳動湖平臺在批計算和特徵場景的實踐

語言: CN / TW / HK

本文整理自火山引擎雲原生計算研發工程師劉緯在 DataFunCon 2022 上的演講。隨著業務的發展,位元組跳動特徵儲存已到達 EB 級別,日均增量 PB 級別,每天訓練資源量級為百萬 Core。隨之而來的是內部業務方對原始資料儲存、特徵回填需求、降低成本、提升速度等需求的期待。本次分享將圍繞問題背景、選型& Iceberg 簡介、基於 Iceberg 的實踐及未來規劃展開。

 

作者:劉緯

整理:王吉東,於惠

問題背景

使用者使用流程

如我們所知,位元組跳動是一家擅長做 A/B test 的公司。以特徵工程調研場景為例,流程如下:

  • 首先由演算法工程師進行線上特徵抽取;
  • 將抽取到的特徵,使用 Protobuf 的格式按行存至 HDFS;出於儲存成本的考量,一般只儲存抽取後的特徵,而不儲存原始特徵
  • 將 HDFS 儲存的特徵交由位元組自研的分散式框架( Primus )進行併發讀取,並進行編碼和解碼操作,進而傳送給訓練器。
  • 由訓練器對模型進行高效訓練如果模型訓練效果符合演算法工程師的預期,說明該調研特徵生效,進而演算法工程師對調研特徵進行回溯,通過 Spark 作業將特徵回填到歷史資料中,分享給其他演算法工程師,進而迭代更多的優質模型如果模型訓練效果不符合演算法工程師的預期,則調研特徵不對原有特徵集合產生影響

業務規模

公司龐大的業務規模,帶來了巨大的計算和儲存體量:

  • 特徵儲存總量達 EB 級;
  • 單表特徵最大可達百 PB 級(如廣告業務);
  • 單日特徵儲存增量達 PB 級;
  • 單日訓練資源開銷達 PB 級。

 

遇到的問題

當特徵調研場景疊加巨大的資料體量,將會遇到以下困難:

  • 特徵儲存空間佔用較大
  • 樣本讀放大,不能列裁剪,很難落特徵進樣本;
  • 樣本寫放大,COW 很難做特徵回溯調研;
  • 不支援特徵 Schema 校驗;
  • 平臺端到端體驗差,使用者使用成本高

 

選型& Iceberg簡介

在特徵調研場景下,行儲存是個低效的儲存方式;因此,我們選擇 Iceberg 儲存方式來解決上述問題。

整體分層

Apache Iceberg 是由 Netflix 公司推出的一種用於大型分析表的高效能通用表格式實現方案。

如上圖所示,系統分成引擎層、表格式層、檔案格式層、快取加速層、物件儲存層。圖中可以看出,Iceberg 所處的層級和 Hudi,DeltaLake 等工具一樣,都是表格式層:

  • 向上提供統一的操作 API
  • Iceberg 定義表元資料資訊以及 API 介面,包括表字段資訊、表文件組織形式、表索引資訊、表統計資訊以及上層查詢引擎讀取、表寫入檔案介面等,使得 Spark, Flink 等計算引擎能夠同時高效使用相同的表。
  • 下層有 parquet、orc、avro 等檔案格式可供選擇
  • 下接快取加速層,包括開源的 Alluxio、火山引擎自研的 CFS 等;CFS 全稱是Cloud File System, 是面向火山引擎和專有云場景下的大資料統一儲存服務,支援高效能的快取和頻寬加速,提供相容 HDFS API 的訪問介面。
  • 最底層的實際物理儲存,可以選擇物件儲存,比如 AWS S3,火山引擎的 TOS,或者可以直接使用 HDFS。

通過上圖可以比較清晰地瞭解到,Iceberg 這個抽象層最大的優勢在於:將底層檔案的細節對使用者遮蔽,將上層的計算與下層的儲存進行分離,從而在儲存和計算的選擇上更為靈活,使用者可以通過表的方式去訪問,無需關心底層檔案的資訊。

Iceberg簡介

Iceberg 架構

Iceberg 的本質是一種檔案的組織形式。如上圖所示,包括多級的結構:

  • Iceberg Catalog:用於儲存表和儲存路徑的對映關係,其核心資訊是儲存 Version 檔案所在的目錄。Iceberg Catalog 共有8種實現方式,包括 HadoopCatalog,HiveCatalog,JDBCCatalog,RestCatalog 等不同的實現方式,其底層儲存資訊會略有不同;RestCatalog 方式無需對接任何一種具體的儲存,而是通過提供 Restful API 介面,藉助 Web 服務實現 Catalog,進一步實現了底層儲存的解耦。
  • Metadata File:用來儲存表的元資料資訊,包括表的 Schema、分割槽資訊、快照資訊( Snapshot )等。Snapshot 是快照資訊,表示表在某一時刻的狀態;使用者每次對 Table 進行一次寫操作,均會生成一個新的 SnapShot。 Manifestlist 是清單檔案列表,用於儲存單個快照的清單檔案。Manifestfile 是儲存的每個資料檔案對應的清單檔案,用來追蹤這個資料檔案的位置、分割槽資訊、列的最大最小值、是否存在 Null 值等統計資訊。
  • Data File 是儲存的資料,資料將以 Parquet、Orc、Avro 等檔案格式進行儲存。

 

Iceberg 特點

  • SchemaEvolution:Iceberg 表結構的更新,本質是內在元資訊的更新,因此無需進行資料遷移或資料重寫。Iceberg 保證模式的演化( Schema Evolution )是個獨立的、沒有副作用的操作流程,不會涉及到重寫資料檔案等操作。
  • Time travel:使用者可任意讀取歷史時刻的相關資料,並使用完全相同的快照進行重複查詢。
  • MVCC:Iceberg 通過 MVCC 來支援事務,解決讀寫衝突的問題;
  • 開放標準:Iceberg 不繫結任何計算引擎,擁有完全獨立開放的標準,易於拓展。

 

Iceberg 讀寫流程和提交流程

1.讀寫

  • 每次 Iceberg 的寫操作,只有在發生 Commit 之後,才是可讀的;如有多個執行緒同時在讀,一部分執行緒在寫,就只有在 Commit 全部資料之後,對使用者進行的讀操作才能被使用者的讀執行緒所看到,從而實現讀寫分離。
  • 例如上圖中,在對 S3 進行寫操作的時候,S2、S1 的讀操作是不受影響的;此時 S3 無法被讀到,只有Commit 之後 S3 才會被讀到。此時 Current Snapshot 會指向 S3。
  • Iceberg 預設從最新 Current Snapshot 讀取資料;如果讀更早的資料,可通過指定對應的 Snapshot ID ,實現資料回溯。

2.事務性提交

  • 寫操作:記錄當前元資料的版本——Base Version,建立新的元資料以及 Manifest 檔案,原子性將 Base Version 替換為新的版本。
  • 原子性替換:原子性替換保證了線性歷史,通過元資料管理器所提供的能力,以及 HDFS 或本地檔案系統所提供的原子化 Rename 能力實現。
  • 衝突解決:基於樂觀鎖實現,每一個 Writer 假定當前沒有其他的寫操作,即對錶的 Write 進行原子性的 Commit,若遇到衝突則基於當前最新的元資料進行重試。

 

分割槽裁剪

  • 直接定位到 Parquet 檔案,無需呼叫檔案系統的 List 操作;
  • Partition 的儲存方式對使用者透明,使用者在修改 Partition 定義時,Iceberg 可以自動地修改儲存佈局,無需使用者重複操作。

 

謂詞下推

Iceberg 會在兩個層面實現謂詞下推:

  • 在 Snapshot 層面,過濾掉不滿足條件的 Data File;
  • 在 Data File 層面,過濾掉不滿足條件的資料。

其中,Snapshot 層面的過濾操作為 Iceberg 所特有,正是利用到 Manifest 檔案中的元資料資訊,逐欄位實現檔案的篩選,大大地減少了檔案的掃描量。而同為Table Format 產品、在位元組其他業務產線已投入使用的 Hudi,雖然同樣具備分割槽剪枝功能,但是尚不具備謂詞下推功能。

 

基於 Iceberg 的實踐

Hudi、Iceberg、DeltaLake 這三款 TableFormat 產品各有優劣,然而並沒有任何一款產品能夠直接滿足我們的使用場景需求;考慮到 Iceberg 具備良好的 Schema Evolution 能力、支援下推,且無需繫結計算引擎等優點,因此位元組選擇使用 Iceberg 作為資料湖工具。

整體架構

  • 在位元組的整體架構中,最上層是業務層,包含抖音,頭條,小說等位元組絕大部分業務線,以及火山引擎雲原生計算等相關 ToB 產品(如 Seveless Spark 等);
  • 在平臺層,使用 Global Lake Service 給業務方提供簡單易用的 UI 和訪問控制等功能;
  • 在框架層,使用 Spark 作為特徵處理框架(包含預處理和特徵調研等),使用位元組自研的 Primus 分散式框架作為訓練框架,使用 Flink 實現流式訓練;
  • 在格式層,選擇 Parquet 作為檔案格式,使用 Iceberg 作為表格式;
  • 最下層是排程器層和儲存層。選擇 Yarn 和 K8S 作為排程器;儲存層一般選擇 HDFS 進行儲存,對於 ToB 產品,則使用 CFS 進行儲存。

Data-Parquet

結合上圖可以看出,列儲存在特徵調研場景存在以下優勢:

  • 可選擇指定列進行讀取:有效減少讀放大問題,同時易於增列,即新增一列的時候,只需單獨寫入一列即可,元資料資訊會記錄每一列所在的磁碟位置;
  • 壓縮:同一列的資料格式相同,因此具有更好的壓縮比;同一列的資料名稱相同,因此無需進行冗餘字串儲存;
  • 謂詞下推:對每一列資料記錄相應的統計資訊(如 Min,Max 等),因此可以實現部分的謂詞下推。

為了解決業務方的痛點問題,我們改成使用 Parquet 列儲存格式,以降低資料的儲存成本;同時由於 Parquet 選列具備下推到儲存層的特性,在訓練時只需讀取模型所需要的特徵即可,從而降低訓練時序列化、反序列化的成本,提升訓練的速度。

然而使用 Parquet 列儲存,帶來優點的同時也相應地帶來了一些問題:

  • 原來的行儲存方式是基於 Protobuf 定義的半結構化資料,無需預先定義 Schema;然而使用 Parquet 之後,需要預先指定 Schema 才能進行資料的存取;這樣在特徵新增和淘汰的時候,Schema 的更新將會變成一個棘手的問題。
  • 此外,Parquet 不支援資料回填;如果需要要回填比較長的資料,就需要將資料全量讀取,增加新列,再全量寫回。這樣一方面會造成大量計算資源的浪費,另一方面會帶來 Overwrite 操作,導致正在進行訓練的任務由於檔案被替換而失敗。

為了解決以上兩個問題,我們引入了Iceberg 來支援 SchemaEvolution,特徵回填以及併發讀寫。

特徵回填

COW

從上圖可以看出,使用 Iceberg COW( Copy on Write )方式進行特徵回填,通過 BackFill 任務將原快照中的資料全部讀出,然後新增新列寫出到新的 Data File 中,並生成新的快照。這種方式的缺點在於,僅僅新增一列資料的寫入,卻需要整體資料全部讀出後再全部寫回,浪費了大量的計算資源和儲存資源;因此,我們基於開源的 Iceberg 自研了一種 MOR( Merge on Read )的 BackFill 方案。

MOR

從上圖可以看出,在 MOR 方案中,我們仍然需要一個 BackFill 任務來讀取原始的 Data File 檔案;所不同的是,我們只需讀取少數需要的欄位。例如對 A 列通過一些計算邏輯生成 C 列,那麼 BackFill 任務只需從 Snapshot1 中讀取 A 列的資料,且只需將 C 列的 Update 檔案寫入 Snapshot2 即可

隨著新增列的增多,需要將 Update 檔案合併到 Data File 裡面;為此,可以進一步提供一種 Compaction 邏輯,即通過讀取舊的 Data File 和 Update File,合併生成新的 Data File。實現細節如下:

  • 舊 Data File 和 Update File 增加一個主鍵,每個檔案按照主鍵排序;
  • 讀取舊 Data File 時根據使用者選擇的列,分析具體需要哪些 Update File 和 Data File;
  • 根據舊 Data File 中 Min-Max 值去選擇對應的 Update File。

由此可以看出,MOR 的本質是對多個 Data File 檔案和 Update File 檔案進行多路歸併,歸併的順序由 SEQ 決定,SEQ 大的資料(表明資料越新)會覆蓋 SEQ 小的資料。

兩種特徵回填方式對比

  • COW:讀寫放大嚴重、儲存空間浪費、讀取邏輯簡單、寫入耗費更多資源、讀取無需額外計算資源;
  • MOR:沒有讀寫放大、節省儲存空間、讀取邏輯複雜、寫入耗費較少資源、絕大多數場景,不需要額外資源;

相比於 COW 方式的全量讀取和寫入,MOR 的優勢在於只讀取需要的列,同樣也只寫入更新的列,因此避免了讀寫放大的問題,節省大量計算資源,並大大降低讀寫 I/O;相比 COW 方式每次 COW 翻倍的情況,MOR 只需儲存新增列,大量節省了儲存資源。

對於模型訓練任務而言,大多數模型訓練只需要用到少量的列,因此大量的線上模型都無需 MOR 操作,涉及開銷可忽略不計;對於少數的特徵調研模型,只需讀取模型對應的 Update File 即可,因此帶來的讀取資源增加也非常有限。

 

其他

除了上面提到的藉助 Compaction 提高讀效能以及分析特徵刪除場景外,還提供了以下幾個服務:

  • ExpirationSnapshot Expiration: 用於處理過期的 Snapshots。過期 Snapshots 不及時清理,會導致元資料檔案堆積,從而帶來檔案膨脹問題,會給演算法工程師帶來困擾,因此需要服務定期做一些清理。我們通過平臺化改造實現 Snapshots 檔案的統一維護和清理;Data Expiration: 大部分資料是有新鮮度和時效性的,因此使用者可設定資料儲存多久後被清理。
  • CleanUp:由於一些事務的失敗,或者一些快照的過期,導致檔案在元資料檔案中已經不再被引用,需要定期清理掉。
  • Roll-Back:對於一些在 Table 中非預期資料或者 Schema 變更,希望將其回滾到之前穩定的 Snapshot;結合平臺的事件管理器,可以比較容易的實現這一功能。
  • Statistics: 用來實現一些湖平臺視覺化資訊的展示,以及後端服務給業務帶來的價值歸納。

 

平臺化改造

這裡分享下自位元組內部實現的平臺化工作。上圖是批式特徵儲存的列表,藉助站內實現的湖平臺化工作,業務部門可以輕鬆實現特徵的視覺化操作,以及資訊概覽的獲取。

下圖是一張特徵表樣例,通過這張表可以直觀地看到儲存空間的使用、檔案數的統計、記錄數統計、特徵統計等資訊。

 

未來規劃

規劃重點

在未來規劃中,計劃逐步支援以下功能:

  • 湖冷熱分層:在成本優化方面,可以通過湖冷熱分層實現。前文提到對於儲存超過一定時間的資料,可以直接刪除;然而在某些特定的場景下,這些資料還會被使用,只是訪問頻率較低;因此未來考慮增加資料湖冷熱分層功能,幫助使用者降低成本。
  • 物化檢視:在查詢優化方面,通過物化檢視提升查詢效能。該功能是源於 ToB 客戶的真實場景需求,目前這部分的優化工作正處於商業化交付流程中,大家可以後續在火山引擎官網相關的產品上進行體驗。
  • Self-Optimize:在體驗優化方面,實現 Self-Optimize,例如前文提到的一些資料維護的優化等。
  • 支援更多引擎:為了增加生態的豐富度, Iceberg 在未來也會逐漸更多的引擎。

 

整體平臺架構總覽

整體平臺架構以計算引擎產品為核心,包含兩部分服務:

  • 雲原生管理控制:Quota 服務、租戶管理服務、執行時管理、生態整合服務、交付部署服務、閘道器服務;
  • 雲原生運維平臺:元件服務生命週期管理、Helm Chart 管理、日誌&審計、監控報警、容災&高可用;

如前文所述,該平臺不僅支援公司內部的業務,還會支援一定的 ToB 的業務,以上在位元組內部實現的功能,以及未來規劃的能力也會基於內外一致的思路進行演進;最終都會落地到上圖中涉及到的幾款雲原生計算產品中,如流式計算 Flink 版,雲原生訊息引擎 BMQ,雲搜尋服務 OpenSearch,大資料檔案儲存 CloudFS 等。以上均為 Serverless 的全託管產品,讓使用者更聚焦於自己的業務邏輯,減少資料運維帶來的困擾。

如有需求,歡迎填寫問卷,參與技術交流「連結」