字節跳動湖平台在批計算和特徵場景的實踐

語言: CN / TW / HK

本文整理自火山引擎雲原生計算研發工程師劉緯在 DataFunCon 2022 上的演講。隨着業務的發展,字節跳動特徵存儲已到達 EB 級別,日均增量 PB 級別,每天訓練資源量級為百萬 Core。隨之而來的是內部業務方對原始數據存儲、特徵回填需求、降低成本、提升速度等需求的期待。本次分享將圍繞問題背景、選型& Iceberg 簡介、基於 Iceberg 的實踐及未來規劃展開。

 

作者:劉緯

整理:王吉東,於惠

問題背景

用户使用流程

如我們所知,字節跳動是一家擅長做 A/B test 的公司。以特徵工程調研場景為例,流程如下:

  • 首先由算法工程師進行在線特徵抽取;
  • 將抽取到的特徵,使用 Protobuf 的格式按行存至 HDFS;出於存儲成本的考量,一般只存儲抽取後的特徵,而不存儲原始特徵
  • 將 HDFS 存儲的特徵交由字節自研的分佈式框架( Primus )進行併發讀取,並進行編碼和解碼操作,進而發送給訓練器。
  • 由訓練器對模型進行高效訓練如果模型訓練效果符合算法工程師的預期,説明該調研特徵生效,進而算法工程師對調研特徵進行回溯,通過 Spark 作業將特徵回填到歷史數據中,分享給其他算法工程師,進而迭代更多的優質模型如果模型訓練效果不符合算法工程師的預期,則調研特徵不對原有特徵集合產生影響

業務規模

公司龐大的業務規模,帶來了巨大的計算和存儲體量:

  • 特徵存儲總量達 EB 級;
  • 單表特徵最大可達百 PB 級(如廣告業務);
  • 單日特徵存儲增量達 PB 級;
  • 單日訓練資源開銷達 PB 級。

 

遇到的問題

當特徵調研場景疊加巨大的數據體量,將會遇到以下困難:

  • 特徵存儲空間佔用較大
  • 樣本讀放大,不能列裁剪,很難落特徵進樣本;
  • 樣本寫放大,COW 很難做特徵回溯調研;
  • 不支持特徵 Schema 校驗;
  • 平台端到端體驗差,用户使用成本高

 

選型& Iceberg簡介

在特徵調研場景下,行存儲是個低效的存儲方式;因此,我們選擇 Iceberg 存儲方式來解決上述問題。

整體分層

Apache Iceberg 是由 Netflix 公司推出的一種用於大型分析表的高性能通用表格式實現方案。

如上圖所示,系統分成引擎層、表格式層、文件格式層、緩存加速層、對象存儲層。圖中可以看出,Iceberg 所處的層級和 Hudi,DeltaLake 等工具一樣,都是表格式層:

  • 向上提供統一的操作 API
  • Iceberg 定義表元數據信息以及 API 接口,包括表字段信息、表文件組織形式、表索引信息、表統計信息以及上層查詢引擎讀取、表寫入文件接口等,使得 Spark, Flink 等計算引擎能夠同時高效使用相同的表。
  • 下層有 parquet、orc、avro 等文件格式可供選擇
  • 下接緩存加速層,包括開源的 Alluxio、火山引擎自研的 CFS 等;CFS 全稱是Cloud File System, 是面向火山引擎和專有云場景下的大數據統一存儲服務,支持高性能的緩存和帶寬加速,提供兼容 HDFS API 的訪問接口。
  • 最底層的實際物理存儲,可以選擇對象存儲,比如 AWS S3,火山引擎的 TOS,或者可以直接使用 HDFS。

通過上圖可以比較清晰地瞭解到,Iceberg 這個抽象層最大的優勢在於:將底層文件的細節對用户屏蔽,將上層的計算與下層的存儲進行分離,從而在存儲和計算的選擇上更為靈活,用户可以通過表的方式去訪問,無需關心底層文件的信息。

Iceberg簡介

Iceberg 架構

Iceberg 的本質是一種文件的組織形式。如上圖所示,包括多級的結構:

  • Iceberg Catalog:用於保存表和存儲路徑的映射關係,其核心信息是保存 Version 文件所在的目錄。Iceberg Catalog 共有8種實現方式,包括 HadoopCatalog,HiveCatalog,JDBCCatalog,RestCatalog 等不同的實現方式,其底層存儲信息會略有不同;RestCatalog 方式無需對接任何一種具體的存儲,而是通過提供 Restful API 接口,藉助 Web 服務實現 Catalog,進一步實現了底層存儲的解耦。
  • Metadata File:用來存儲表的元數據信息,包括表的 Schema、分區信息、快照信息( Snapshot )等。Snapshot 是快照信息,表示表在某一時刻的狀態;用户每次對 Table 進行一次寫操作,均會生成一個新的 SnapShot。 Manifestlist 是清單文件列表,用於存儲單個快照的清單文件。Manifestfile 是存儲的每個數據文件對應的清單文件,用來追蹤這個數據文件的位置、分區信息、列的最大最小值、是否存在 Null 值等統計信息。
  • Data File 是存儲的數據,數據將以 Parquet、Orc、Avro 等文件格式進行存儲。

 

Iceberg 特點

  • SchemaEvolution:Iceberg 表結構的更新,本質是內在元信息的更新,因此無需進行數據遷移或數據重寫。Iceberg 保證模式的演化( Schema Evolution )是個獨立的、沒有副作用的操作流程,不會涉及到重寫數據文件等操作。
  • Time travel:用户可任意讀取歷史時刻的相關數據,並使用完全相同的快照進行重複查詢。
  • MVCC:Iceberg 通過 MVCC 來支持事務,解決讀寫衝突的問題;
  • 開放標準:Iceberg 不綁定任何計算引擎,擁有完全獨立開放的標準,易於拓展。

 

Iceberg 讀寫流程和提交流程

1.讀寫

  • 每次 Iceberg 的寫操作,只有在發生 Commit 之後,才是可讀的;如有多個線程同時在讀,一部分線程在寫,就只有在 Commit 全部數據之後,對用户進行的讀操作才能被用户的讀線程所看到,從而實現讀寫分離。
  • 例如上圖中,在對 S3 進行寫操作的時候,S2、S1 的讀操作是不受影響的;此時 S3 無法被讀到,只有Commit 之後 S3 才會被讀到。此時 Current Snapshot 會指向 S3。
  • Iceberg 默認從最新 Current Snapshot 讀取數據;如果讀更早的數據,可通過指定對應的 Snapshot ID ,實現數據回溯。

2.事務性提交

  • 寫操作:記錄當前元數據的版本——Base Version,創建新的元數據以及 Manifest 文件,原子性將 Base Version 替換為新的版本。
  • 原子性替換:原子性替換保證了線性歷史,通過元數據管理器所提供的能力,以及 HDFS 或本地文件系統所提供的原子化 Rename 能力實現。
  • 衝突解決:基於樂觀鎖實現,每一個 Writer 假定當前沒有其他的寫操作,即對錶的 Write 進行原子性的 Commit,若遇到衝突則基於當前最新的元數據進行重試。

 

分區裁剪

  • 直接定位到 Parquet 文件,無需調用文件系統的 List 操作;
  • Partition 的存儲方式對用户透明,用户在修改 Partition 定義時,Iceberg 可以自動地修改存儲佈局,無需用户重複操作。

 

謂詞下推

Iceberg 會在兩個層面實現謂詞下推:

  • 在 Snapshot 層面,過濾掉不滿足條件的 Data File;
  • 在 Data File 層面,過濾掉不滿足條件的數據。

其中,Snapshot 層面的過濾操作為 Iceberg 所特有,正是利用到 Manifest 文件中的元數據信息,逐字段實現文件的篩選,大大地減少了文件的掃描量。而同為Table Format 產品、在字節其他業務產線已投入使用的 Hudi,雖然同樣具備分區剪枝功能,但是尚不具備謂詞下推功能。

 

基於 Iceberg 的實踐

Hudi、Iceberg、DeltaLake 這三款 TableFormat 產品各有優劣,然而並沒有任何一款產品能夠直接滿足我們的使用場景需求;考慮到 Iceberg 具備良好的 Schema Evolution 能力、支持下推,且無需綁定計算引擎等優點,因此字節選擇使用 Iceberg 作為數據湖工具。

整體架構

  • 在字節的整體架構中,最上層是業務層,包含抖音,頭條,小説等字節絕大部分業務線,以及火山引擎雲原生計算等相關 ToB 產品(如 Seveless Spark 等);
  • 在平台層,使用 Global Lake Service 給業務方提供簡單易用的 UI 和訪問控制等功能;
  • 在框架層,使用 Spark 作為特徵處理框架(包含預處理和特徵調研等),使用字節自研的 Primus 分佈式框架作為訓練框架,使用 Flink 實現流式訓練;
  • 在格式層,選擇 Parquet 作為文件格式,使用 Iceberg 作為表格式;
  • 最下層是調度器層和存儲層。選擇 Yarn 和 K8S 作為調度器;存儲層一般選擇 HDFS 進行存儲,對於 ToB 產品,則使用 CFS 進行存儲。

Data-Parquet

結合上圖可以看出,列存儲在特徵調研場景存在以下優勢:

  • 可選擇指定列進行讀取:有效減少讀放大問題,同時易於增列,即新增一列的時候,只需單獨寫入一列即可,元數據信息會記錄每一列所在的磁盤位置;
  • 壓縮:同一列的數據格式相同,因此具有更好的壓縮比;同一列的數據名稱相同,因此無需進行宂餘字符串存儲;
  • 謂詞下推:對每一列數據記錄相應的統計信息(如 Min,Max 等),因此可以實現部分的謂詞下推。

為了解決業務方的痛點問題,我們改成使用 Parquet 列存儲格式,以降低數據的存儲成本;同時由於 Parquet 選列具備下推到存儲層的特性,在訓練時只需讀取模型所需要的特徵即可,從而降低訓練時序列化、反序列化的成本,提升訓練的速度。

然而使用 Parquet 列存儲,帶來優點的同時也相應地帶來了一些問題:

  • 原來的行存儲方式是基於 Protobuf 定義的半結構化數據,無需預先定義 Schema;然而使用 Parquet 之後,需要預先指定 Schema 才能進行數據的存取;這樣在特徵新增和淘汰的時候,Schema 的更新將會變成一個棘手的問題。
  • 此外,Parquet 不支持數據回填;如果需要要回填比較長的數據,就需要將數據全量讀取,增加新列,再全量寫回。這樣一方面會造成大量計算資源的浪費,另一方面會帶來 Overwrite 操作,導致正在進行訓練的任務由於文件被替換而失敗。

為了解決以上兩個問題,我們引入了Iceberg 來支持 SchemaEvolution,特徵回填以及併發讀寫。

特徵回填

COW

從上圖可以看出,使用 Iceberg COW( Copy on Write )方式進行特徵回填,通過 BackFill 任務將原快照中的數據全部讀出,然後添加新列寫出到新的 Data File 中,並生成新的快照。這種方式的缺點在於,僅僅新增一列數據的寫入,卻需要整體數據全部讀出後再全部寫回,浪費了大量的計算資源和存儲資源;因此,我們基於開源的 Iceberg 自研了一種 MOR( Merge on Read )的 BackFill 方案。

MOR

從上圖可以看出,在 MOR 方案中,我們仍然需要一個 BackFill 任務來讀取原始的 Data File 文件;所不同的是,我們只需讀取少數需要的字段。例如對 A 列通過一些計算邏輯生成 C 列,那麼 BackFill 任務只需從 Snapshot1 中讀取 A 列的數據,且只需將 C 列的 Update 文件寫入 Snapshot2 即可

隨着新增列的增多,需要將 Update 文件合併到 Data File 裏面;為此,可以進一步提供一種 Compaction 邏輯,即通過讀取舊的 Data File 和 Update File,合併生成新的 Data File。實現細節如下:

  • 舊 Data File 和 Update File 增加一個主鍵,每個文件按照主鍵排序;
  • 讀取舊 Data File 時根據用户選擇的列,分析具體需要哪些 Update File 和 Data File;
  • 根據舊 Data File 中 Min-Max 值去選擇對應的 Update File。

由此可以看出,MOR 的本質是對多個 Data File 文件和 Update File 文件進行多路歸併,歸併的順序由 SEQ 決定,SEQ 大的數據(表明數據越新)會覆蓋 SEQ 小的數據。

兩種特徵回填方式對比

  • COW:讀寫放大嚴重、存儲空間浪費、讀取邏輯簡單、寫入耗費更多資源、讀取無需額外計算資源;
  • MOR:沒有讀寫放大、節省存儲空間、讀取邏輯複雜、寫入耗費較少資源、絕大多數場景,不需要額外資源;

相比於 COW 方式的全量讀取和寫入,MOR 的優勢在於只讀取需要的列,同樣也只寫入更新的列,因此避免了讀寫放大的問題,節省大量計算資源,並大大降低讀寫 I/O;相比 COW 方式每次 COW 翻倍的情況,MOR 只需存儲新增列,大量節省了存儲資源。

對於模型訓練任務而言,大多數模型訓練只需要用到少量的列,因此大量的線上模型都無需 MOR 操作,涉及開銷可忽略不計;對於少數的特徵調研模型,只需讀取模型對應的 Update File 即可,因此帶來的讀取資源增加也非常有限。

 

其他

除了上面提到的藉助 Compaction 提高讀性能以及分析特徵刪除場景外,還提供了以下幾個服務:

  • ExpirationSnapshot Expiration: 用於處理過期的 Snapshots。過期 Snapshots 不及時清理,會導致元數據文件堆積,從而帶來文件膨脹問題,會給算法工程師帶來困擾,因此需要服務定期做一些清理。我們通過平台化改造實現 Snapshots 文件的統一維護和清理;Data Expiration: 大部分數據是有新鮮度和時效性的,因此用户可設置數據保存多久後被清理。
  • CleanUp:由於一些事務的失敗,或者一些快照的過期,導致文件在元數據文件中已經不再被引用,需要定期清理掉。
  • Roll-Back:對於一些在 Table 中非預期數據或者 Schema 變更,希望將其回滾到之前穩定的 Snapshot;結合平台的事件管理器,可以比較容易的實現這一功能。
  • Statistics: 用來實現一些湖平台可視化信息的展示,以及後端服務給業務帶來的價值歸納。

 

平台化改造

這裏分享下自字節內部實現的平台化工作。上圖是批式特徵存儲的列表,藉助站內實現的湖平台化工作,業務部門可以輕鬆實現特徵的可視化操作,以及信息概覽的獲取。

下圖是一張特徵表樣例,通過這張表可以直觀地看到存儲空間的使用、文件數的統計、記錄數統計、特徵統計等信息。

 

未來規劃

規劃重點

在未來規劃中,計劃逐步支持以下功能:

  • 湖冷熱分層:在成本優化方面,可以通過湖冷熱分層實現。前文提到對於保存超過一定時間的數據,可以直接刪除;然而在某些特定的場景下,這些數據還會被使用,只是訪問頻率較低;因此未來考慮增加數據湖冷熱分層功能,幫助用户降低成本。
  • 物化視圖:在查詢優化方面,通過物化視圖提升查詢性能。該功能是源於 ToB 客户的真實場景需求,目前這部分的優化工作正處於商業化交付流程中,大家可以後續在火山引擎官網相關的產品上進行體驗。
  • Self-Optimize:在體驗優化方面,實現 Self-Optimize,例如前文提到的一些數據維護的優化等。
  • 支持更多引擎:為了增加生態的豐富度, Iceberg 在未來也會逐漸更多的引擎。

 

整體平台架構總覽

整體平台架構以計算引擎產品為核心,包含兩部分服務:

  • 雲原生管理控制:Quota 服務、租户管理服務、運行時管理、生態整合服務、交付部署服務、網關服務;
  • 雲原生運維平台:組件服務生命週期管理、Helm Chart 管理、日誌&審計、監控報警、容災&高可用;

如前文所述,該平台不僅支持公司內部的業務,還會支持一定的 ToB 的業務,以上在字節內部實現的功能,以及未來規劃的能力也會基於內外一致的思路進行演進;最終都會落地到上圖中涉及到的幾款雲原生計算產品中,如流式計算 Flink 版,雲原生消息引擎 BMQ,雲搜索服務 OpenSearch,大數據文件存儲 CloudFS 等。以上均為 Serverless 的全託管產品,讓用户更聚焦於自己的業務邏輯,減少數據運維帶來的困擾。

如有需求,歡迎填寫問卷,參與技術交流「鏈接」