位元組跳動基於 Iceberg 的海量特徵儲存實踐

語言: CN / TW / HK

背景

位元組跳動特徵儲存痛點

當前行業內的特徵儲存整體流程主要分為以下四步:

特徵儲存的整體流程

  1. 業務線上進行特徵模組抽取;
  1. 抽取後的特徵以行的格式儲存在 HDFS ,考慮到成本,此時不儲存原始特徵,只存抽取後的特徵;
  1. 位元組跳動自研的分散式框架會將儲存的特徵併發讀取並解碼傳送給訓練器;
  1. 訓練器負責高速訓練。

位元組跳動特徵儲存總量為 EB 級別,每天的增量達到 PB 級別,並且每天用於訓練的資源也達到了百萬核心,所以整體上位元組的儲存和計算的體量都是非常大的。在如此的體量之下,我們遇到了以下三大痛點:

  1. 特徵抽取週期長。 在特徵抽取上,當前採用的是線上抽取的方式。大量的演算法工程師,每天都在進行大量的特徵相關的試驗。在當前的線上抽取模式下,如果有演算法工程師想要調研一個新的特徵,那麼他首先需要定義特徵的計算方式,等待線上模組的統一上線,然後需要等線上抽取的特徵積累到一定的量級後才可以進行訓練,從而判斷這個特徵是否有效果。這個過程通常需要2周甚至更長的時間。並且,如果發現特徵的計算邏輯寫錯或想要更改計算邏輯,則需重複上述過程。線上特徵抽取導致當前位元組特徵調研的效率非常低。基於當前的架構,離線特徵調研的成本又非常高。
  1. 特徵儲存空間佔用大。 位元組的特徵儲存當前是以行存的形式進行儲存。如果基於當前的行存做特徵調研,則需要基於原來的路徑額外生成新的資料集。一方面需要額外的空間對新的資料集進行儲存,另一方面還需要額外的計算資源去讀取原來的全量資料生成新的資料,且很難做資料的管理和複用。行存對於特徵儲存來說,也很難進行優化,佔用空間較大。
  1. 模型訓練 頻寬大,資料讀取有瓶頸。 位元組當前將每個業務線的絕大部分特徵都儲存在一個路徑下,訓練的時候會直接基於這個路徑進行訓練。對於每個模型,訓練所需的特徵是不一樣的,每個業務線可能存有上萬個特徵,而大部分模型訓練往往只需要幾百個特徵,但因為特徵是以行存格式進行儲存,所以訓練時需要將上萬特徵全部讀取後,再在記憶體中進行過濾,這就使得模型訓練的頻寬需求非常大,資料的讀取成為了整個訓練的瓶頸。

基於痛點的需求梳理

基於上述問題,我們與業務方一同總結了若干需求:

  1. 儲存原始特徵:由於線上特徵抽取在特徵調研上的低效率,我們期望能夠儲存原始特徵;
  1. 離線調研能力:在原始特徵的基礎上,可以進行離線調研,從而提升特徵調研效率;
  1. 支援特徵回填:支援特徵回填,在調研完成後,可以將歷史資料全部刷上調研好的特徵;
  1. 降低儲存成本:充分利用資料分佈的特殊性,降低儲存成本,騰出資源來儲存原始特徵;
  1. 降低訓練成本:訓練時只讀需要的特徵,而非全量特徵,降低訓練成本;
  1. 提升訓練速度:訓練時儘量降低資料的拷貝和序列化反序列化開銷。

位元組跳動海量特徵儲存解決方案

在位元組的整體架構中,最上層是業務層,包括抖音、頭條、小說等位元組絕大部分業務線;

其下我們通過平臺層,給業務同學提供簡單易用的 UI 和訪問控制等功能;

在框架層,我們使用 Spark 作為特徵處理框架(包括預處理和離線特徵調研等),位元組自研的 Primus 作為訓練框架;

在格式層,我們選用 Parquet 作為檔案格式,Iceberg 作為表格式;

最下層是排程器 Yarn & K8s 以及儲存 HDFS。

下面我們重點針對格式層進行詳細介紹。

技術選型

為了滿足業務方提到的6個需求,我們首先想到的是通過 Parquet 列存的格式,降低行存的儲存成本,節省的空間可用來儲存原始特徵。同時由於 Parquet 選列可以下推到儲存層的特性,在訓練時可以只讀需要的特徵,從而降低訓練時反序列化的成本,提升訓練的速度。

但是使用 Parquet 引入了額外的問題,原來的行存是基於 Protobuf 定義的半結構化資料,不需要預先定義 Schema,而使用 Parquet 以後,我們需要先知道 Schema,然後才能進行資料的存取,那麼在特徵新增和淘汰時,Schema 的更新就是一個很難解決的問題。Parquet 並不支援資料回填,如果要回填歷史幾年的資料,就需要將資料全量讀取,增加新列,再全量寫回,這一方面會浪費大量的計算資源,另一方面做特徵回填時的 overwrite 操作,會導致當前正在進行訓練的任務由於檔案被替換而失敗。

為了解決這幾個問題,我們引入了 Iceberg 來支援模式演進、特徵回填和併發讀寫。

Iceberg 是適用於大型資料集的一個開源表格式,具備模式演進、隱藏分割槽&分割槽演進、事務、MVCC、計算儲存引擎解耦等特性,這些特性匹配了我們所有的需求。因此,我們選擇了 Iceberg。

整體上 Iceberg 是一個分層的結構,snapshot 層儲存了當前表的所有快照;manifest list 層儲存了每個快照包含的 manifest 雲資料,這一層的用途主要是為了多個 snapshot 可以複用下一層的 manifest; manifest 層,儲存了下層 Data Files 元資料;最下面的 Data File 是就是實際的資料檔案。通過這樣的多層結構,Iceberg 可以支援上述包括模式演進等幾個特性。

下面我們 來一一 介紹 Iceberg 如何支援這些功能。

基於 Iceberg 的特徵儲存實踐經驗

併發讀寫

在併發讀取方面,Iceberg 是基於快照的讀取,對 Iceberg 的每個操作都會生成新的快照,不影響正在讀取的快照,從而保證讀寫互不影響。

在併發寫入方面,Iceberg 是採用樂觀併發的方式,利用HDFS mv 的原子性語義保證只有一個能寫入成功,而其他的併發寫入會被檢查是否有衝突,若沒有衝突,則寫入下一個 snapshot。

模式演進

Iceberg 的模式演進原理

我們知道,Iceberg 元資料和 Parquet 元資料都有 Column,而中間的對映關係,是通過 ID 欄位來進行一對一對映。

例如上面左圖中,Iceberg 和 Parquet 分別有 ABC 三列,對應 ID 1、2、3。那最終讀取出的 Dataframe 就是 和 Parquet 中一致包含 ID 為1、2、3的 ABC 三列。而當我們對左圖進行兩個操作,刪除舊的 B 列,寫入新的 B 列後, Iceberg 對應的三列 ID 會變成1、3、4,所以右圖中讀出來的 Dataframe,雖然也是 ABC 三列,但是這個 B 列的 ID 並非 Parquet 中 B 列的 ID,因此最終實際的資料中,B 列為空值。

特徵回填

  • 寫時複製

如上圖所示,COW 方式的特徵回填通過一個 Backfill 任務將原快照中的資料全部讀出,然後寫入新列,再寫出到新的 Data File 中,並生成新的快照。

這種方式的缺點在於雖然我們只需要寫一列資料,但是需要將整體資料全部讀出,再全部寫回,不僅浪費了大量的計算資源用來對整個 Parquet 檔案進行編碼解碼,還浪費了大量的 IO 來讀取全量資料,且浪費了大量的儲存資源來儲存重複的 ABC 列。

因此我們基於開源 Iceberg 自研了 MOR 的 Backfill 方案。

  • 讀時合併

如上圖所示,在 MOR 方案中,我們仍然需要一個 Backfill 任務來讀取原始的 Data File 檔案,但是這裡我們只讀取需要的欄位。比如我們只需要 A 列通過某些計算邏輯生成 D 列,那麼 Backfill 任務則只讀取 A 的資料,並且 Snapshot2 中只需要寫包含 D 列的 update 檔案。隨著新增列的增多,我們也需要將 Update 檔案合併回 Data File 檔案中。

為此,我們又提供了 Compaction 邏輯,即讀取舊的 Data File 和 Update File,併合併成一個單獨的 Data File。

MOR原理如上圖,假設原來有一個邏輯 Dataframe 是由兩個 Data File 構成, 現在需要回填一個 ColD 的內容。我們會寫入一個包含 ColD 的 Update File,這樣 Snapshot2 中的邏輯 Dataframe 就會包含ABCD 四列。

實現細節: - - Data File 和 Update File 都需要一個主鍵,並且每個檔案都需要按照主鍵排序,在這個例子中是 ID; - 讀取時,會根據使用者選擇的列,分析具體需要哪些 Update File 和 Data File; - 根據 Data File 中主鍵的 min-max 值去選擇與該 Data File 相對應的 Update File; - MOR 整個過程是多個 Data File 和 Update File 多路歸併的過程; - 歸併的順序由 SEQ 來決定,SEQ 大的資料會覆蓋 SEQ 小的資料。

  • COW 與 MOR 特性 比較

相比於 COW 方式全量讀取和寫入所有列,MOR 的優勢是隻讀取需要的列,也只寫入更新的列,沒有讀寫放大問題。在計算上節省了大量的資源,讀寫的 IO 也大大降低,相比 COW 方式每次 COW 都翻倍的情況, MOR 只需要儲存新增列,也大大避免了儲存資源浪費。

考慮到效能的開銷,我們需要定期 Compaction,Compaction 是一個比較重的操作,和 COW 相當。但是 Compaction 是一個非同步的過程,可以在多次 MOR 後進行一次 Compaction。那麼一次 Compaction 的開銷就可以攤銷到多次 MOR 上,例如10次 COW 和10次 MOR + 1次 Compaction 相比,儲存和讀寫成本都從原來的 10x 降到當前的 2x 。

MOR 的實現成本較高,但這可以通過良好的設計和大量的測試來解決。

而對於模型訓練來說,由於大多數模型訓練只需要自己的列,所以大量的線上模型都不需要走 MOR 的邏輯,可以說基本沒有開銷。而少數的調研模型,往往只需讀自己的 Update File 而不用讀其他的 Update File ,所以整體上讀取的額外資源也並未增加太多。

訓練優化

從行存改為 Iceberg 後,我們也在訓練上也做了大量的優化。

在我們的原始架構中,分散式訓練框架並不解析實際的資料內容,而是直接以行的形式把資料透傳給訓練器,訓練器在內部進行反序列化、選列等操作。

原始架構

引入 Iceberg 後,我們要拿到選列帶來的 CPU 和 IO 收益就需要將選列下推到儲存層。最初為了保證下游訓練器感知不到,我們在訓練框架層面,將選列反序列化後,構造成原來的 ROW 格式,傳送給下游訓練器。相比原來,多了一層序列化反序列化的開銷。

這就導致遷移到 Iceberg 後,整體訓練速度反而變慢,資源也增加了。

列式改造

為了提升訓練速度,我們通過向量化讀取的方式,將 Iceberg 資料直接讀成 Batch 資料,傳送給訓練器,這一步提升了訓練速度,並降低了部分資源消耗。

向量化讀取

為了達到最優效果,我們與訓練器團隊合作,直接修改了訓練器內部,使訓練器可以直接識別 Arrow 資料,這樣我們就實現了從 Iceberg 到訓練器端到端的 Arrow 格式打通,這樣只需要在最開始反序列化為 Arrow ,後續的操作就完全基於 Arrow 進行,從而降低了序列化和反序列化開銷,進一步提升訓練速度,降低資源消耗。

Arrow

優化收益

最終,我們達到了最初的目標,取得了離線特徵工程的能力

在儲存成本上,普遍降低了40%以上 在同樣 訓練速度下,CPU 降低了13%,網路 IO 降低40%

未來規劃

未來,我們規劃支援以下4種能力:

  1. Upsert 的能力,支援使用者的部分資料迴流;
  1. 物化檢視的能力,支援使用者在常用的資料集上建立物化檢視,提高讀取效率;
  1. Data Skipping 能力,進一步優化資料排布,下推更多邏輯,進一步優化 IO 和計算資源;
  1. 基於 Arrow 的資料預處理能力,向用戶提供良好的資料處理介面,同時將預處理提前預期,進一步加速後續的訓練。

位元組跳動基礎架構批式計算團隊持續招聘中,包括 Spark、Ray、ML等方向,支撐位元組所有業務線,海量的資料和業務場景等你來探索。 - 工作地點:北京/杭州/新加坡 - 聯絡方式:歡迎新增微信 bupt001,或傳送簡歷至郵件 [email protected]