5年迭代5次,抖音推薦系統演進歷程

語言: CN / TW / HK

2021 年,位元組跳動旗下產品總 MAU 已超過 19 億。在以抖音、今日頭條、西瓜影片等為代表的產品業務背景下,強大的推薦系統顯得尤為重要。Flink 提供了非常強大的 SQL 模組和有狀態計算模組。目前在位元組推薦場景,實時簡單計數特徵、視窗計數特徵、序列特徵已經完全遷移到 Flink SQL 方案上。結合 Flink SQL 和 Flink 有狀態計算能力,我們正在構建下一代通用的基礎特徵計算統一架構,期望可以高效支援常用有狀態、無狀態基礎特徵的生產。

業務背景

對於今日頭條、抖音、西瓜影片等位元組跳動旗下產品,基於 Feed 流和短時效的推薦是核心業務場景。而推薦系統最基礎的燃料是特徵,高效生產基礎特徵對業務推薦系統的迭代至關重要。

主要業務場景

02.png

  • 抖音、火山短影片等為代表的短影片應用推薦場景,例如 Feed 流推薦、關注、社交、同城等各個場景,整體在國內大概有 6 億 + 規模 DAU;
  • 頭條、西瓜等為代表的 Feed 資訊流推薦場景,例如 Feed 流、關注、子頻道等各個場景,整體在國內有數億規模 DAU;

業務痛點和挑戰

03.png

目前位元組跳動推薦場景基礎特徵的生產現狀是“百花齊放”。離線特徵計算的基本模式都是通過消費 Kafka、BMQ、Hive、HDFS、Abase、RPC 等資料來源,基於 Spark、Flink 計算引擎實現特徵的計算,而後把特徵的結果寫入線上、離線儲存。各種不同型別的基礎特徵計算散落在不同的服務中,缺乏業務抽象,帶來了較大的運維成本和穩定性問題。

而更重要的是,缺乏統一的基礎特徵生產平臺,使業務特徵開發迭代速度和維護存在諸多不便。如業務方需自行維護大量離線任務、特徵生產鏈路缺乏監控、無法滿足不斷髮展的業務需求等。

04.png

在位元組的業務規模下,構建統一的實時特徵生產系統面臨著較大挑戰,主要來自四個方面:

巨大的業務規模:抖音、頭條、西瓜、火山等產品的資料規模可達到日均 PB 級別。例如在抖音場景下,晚高峰 Feed 播放量達數百萬 QPS,客戶端上報使用者行為資料高達數千萬 IOPS。 業務方期望在任何時候,特徵任務都可以做到不斷流、消費沒有 Lag 等,這就要求特徵生產具備非常高的穩定性。

較高的特徵實時化要求:在以直播、電商、短影片為代表的推薦場景下,為保證推薦效果,實時特徵離線生產的時效性需實現常態穩定於分鐘級別。

更好的擴充套件性和靈活性:隨著業務場景不斷複雜,特徵需求更為靈活多變。從統計、序列、屬性型別的特徵生產,到需要靈活支援視窗特徵、多維特徵等,業務方需要特徵中臺能夠支援逐漸衍生而來的新特徵型別和需求。

業務迭代速度快:特徵中臺提供的面向業務的 DSL 需要足夠場景,特徵生產鏈路儘量讓業務少寫程式碼,底層的計算引擎、儲存引擎對業務完全透明,徹底釋放業務計算、儲存選型、調優的負擔,徹底實現實時基礎特徵的規模化生產,不斷提升特徵生產力;

迭代演進過程

在位元組業務爆發式增長的過程中,為了滿足各式各樣的業務特徵的需求,推薦場景衍生出了眾多特徵服務。這些服務在特定的業務場景和歷史條件下較好支援了業務快速發展,大體的歷程如下:

05.png

推薦場景特徵服務演進歷程

在這其中 2020 年初是一個重要節點,我們開始在特徵生產中引入 Flink SQL、Flink State 技術體系,逐步在計數特徵系統、模型訓練的樣本拼接、視窗特徵等場景進行落地,探索出新一代特徵生產方案的思路。

新一代系統架構

結合上述業務背景,我們基於 Flink SQL 和 Flink 有狀態計算能力重新設計了新一代實時特徵計算方案。新方案的定位是:解決基礎特徵的計算和線上 Serving,提供更加抽象的基礎特徵業務層 DSL 在計算層,我們基於 Flink SQL 靈活的資料處理表達能力,以及 Flink State 狀態儲存和計算能力等技術,支援各種複雜的視窗計算。極大地縮短業務基礎特徵的生產週期,提升特徵產出鏈路的穩定性。新的架構裡,我們將 特徵生產的鏈路分為資料來源抽取 / 拼接、狀態儲存、計算三個階段,Flink SQL 完成特徵資料的抽取和流式拼接,Flink State 完成特徵計算的中間狀態儲存。

有狀態特徵是非常重要的一類特徵,其中最常用的就是帶有各種視窗的特徵,例如統計最近 5 分鐘影片的播放 VV 等。對於視窗型別的特徵在位元組內部有一些基於儲存引擎的方案,整體思路是“輕離線重線上”,即把視窗狀態儲存、特徵聚合計算全部放在儲存層和線上完成。離線資料流負責基本資料過濾和寫入,離線明細資料按照時間切分聚合儲存(類似於 micro batch),底層的儲存大部分是 KV 儲存、或者專門優化的儲存引擎,線上層完成複雜的視窗聚合計算邏輯,每個請求來了之後線上層拉取儲存層的明細資料做聚合計算。

我們新的解決思路是“輕線上重離線”,即把比較重的時間切片明細資料狀態儲存和視窗聚合計算全部放在離線層。視窗結果聚合通過離線視窗觸發機制完成,把特徵結果推到線上 KV 儲存。線上模組非常輕量級,只負責簡單的線上 Serving,極大地簡化了線上層的架構複雜度。在離線狀態儲存層。我們主要依賴 Flink 提供的原生狀態儲存引擎 RocksDB,充分利用離線計算叢集本地的 SSD 磁碟資源,極大減輕線上 KV 儲存的資源壓力。

對於長視窗的特徵(7 天以上視窗特徵),由於涉及 Flink 狀態層明細資料的回溯過程,Flink Embedded 狀態儲存引擎沒有提供特別好的外部資料回灌機制(或者說不適合做)。因此對於這種“狀態冷啟動”場景,我們引入了中心化儲存作為底層狀態儲存層的儲存介質,整體是 Hybrid 架構。例如 7 天以內的狀態儲存在本地 SSD,7~30 天狀態儲存到中心化的儲存引擎,離線資料回溯可以非常方便的寫入中心化儲存。

除視窗特徵外,這套機制同樣適用於其他型別的有狀態特徵(如序列型別的特徵)。

實時特徵分類體系

06.png

整體架構

07.png

帶有視窗的特徵,例如抖音影片最近 1h 的點贊量(滑動視窗)、直播間使用者最近一個 Session 的看播時長(Session 視窗)等;

資料來源層

在新的一體化特徵架構中,我們統一把各種型別資料來源抽象為 Schema Table,這是因為底層依賴的 Flink SQL 計算引擎層對資料來源提供了非常友好的 Table Format 抽象。在推薦場景,依賴的資料來源非常多樣,每個特徵上游依賴一個或者多個數據源。資料來源可以是 Kafka、RMQ、KV 儲存、RPC 服務。對於多個數據源,支援資料來源流式、批式拼接,拼接型別包括 Window Join 和基於 Key 粒度的 Window Union Join,維表 Join 支援 Abase、RPC、HIVE 等。具體每種型別的拼接邏輯如下:

08.png

三種類型的 Join 和 Union 可以組合使用,實現複雜的多資料流拼接。例如 (A union B) Window Join (C Lookup Join D)。

09.png

另外,Flink SQL 支援複雜欄位的計算能力,也就是業務方可以基於資料來源定義的 TableSchema 基礎欄位實現擴充套件欄位的計算。業務計算邏輯本質是一個 UDF,我們會提供 UDF API 介面給業務方,然後上傳 JAR 到特徵後臺載入。另外對於比較簡單的計算邏輯,後臺也支援通過提交簡單的 Python 程式碼實現多語言計算。

業務 DSL

從業務視角提供高度抽象的特徵生產 DSL 語言,遮蔽底層計算、儲存引擎細節,讓業務方聚焦於業務特徵定義。業務 DSL 層提供:資料來源、資料格式、資料抽取邏輯、資料生成特徵型別、資料輸出方式等。

10.png

狀態儲存層

11.png

如上文所述,新的特徵一體化方案解決的主要痛點是:如何應對各種型別(一般是滑動視窗)有狀態特徵的計算問題。對於這類特徵,在離線計算層架構裡會有一個狀態儲存層,把抽取層提取的 RawFeature 按照切片 Slot 儲存起來 (切片可以是時間切片、也可以是 Session 切片等)。切片型別在內部是一個介面型別,在架構上可以根據業務需求自行擴充套件。狀態裡面其實儲存的不是原始 RawFeature(儲存原始的行為資料太浪費儲存空間),而是轉化為 FeaturePayload 的一種 POJO 結構,這個結構裡面支援了常見的各種資料結構型別:

  • Int:儲存簡單的計數值型別 (多維度 counter);
  • HashMap:儲存二維計數值,例如 Action Counter,key 為 target_id,value 為計數值;
  • SortedMap: 儲存 topk 二維計數 ;
  • LinkedList
  • :儲存 id_list 型別資料;
  • HashMap<int, List
  • :儲存二維 id_list;
  • 自定義型別,業務可以根據需求 FeaturePayload 裡面自定義資料型別

狀態層更新的業務介面:輸入是 SQL 抽取 / 拼接層抽取出來的 RawFeature,業務方可以根據業務需求實現 UpdateFeatureInfo 介面對狀態層的更新。對於常用的特徵型別內建實現了 Update 介面,業務方自定義特徵型別可以繼承 Update 介面實現。

/** * 特徵狀態 update 介面 */ public interface FeatureStateApi extends Serializable { /** * 特徵更新介面, 上游每條日誌會提取必要欄位轉換為 fields, 用來更新對應的特徵狀態 * * @param fields * context: 儲存特徵名稱、主鍵 和 一些配置引數 ; * oldFeature: 特徵之前的狀態 * fields: 平臺 / 配置檔案 中的抽取欄位 * @return */ FeaturePayLoad assign(Context context,FeaturePayLoad feature, Map<String, Object> rawFeature); }

當然對於無狀態的 ETL 特徵是不需要狀態儲存層的。

計算層

特徵計算層完成特徵計算聚合邏輯,有狀態特徵計算輸入的資料是狀態儲存層儲存的帶有切片的 FeaturePayload 物件。簡單的 ETL 特徵沒有狀態儲存層,輸入直接是 SQL 抽取層的資料 RawFeature 物件,具體的介面如下:

``` /* * 有狀態特徵計算介面 / public interface FeatureStateApi extends Serializable {

/**
 * 特徵聚合介面,會根據配置的特徵計算視窗, 讀取視窗內所有特徵狀態,排序後傳入該介面
 *
 * @param featureInfos, 包含 2 個 field
 *      timeslot: 特徵狀態對應的時間槽
 *      Feature: 該時間槽的特徵狀態
 * @return
 */    
FeaturePayLoad aggregate(Context context, List<Tuple2<Slot, FeaturePayLoad>> slotStates);

} ```

有狀態特徵聚合介面

``` /* * 無狀態特徵計算介面 / public interface FeatureConvertApi extends Serializable {

/**
* 轉換介面, 上游每條日誌會提取必要欄位轉換為 fields, 無狀態計算時,轉換為內部的 feature 型別 ;
 *
 * @param fields
 *      fields: 平臺 / 配置檔案 中的抽取欄位
 * @return
 */
 FeaturePayLoad convert(Context context,  FeaturePayLoad featureSnapshot, Map<String, Object> rawFeatures);

} ```

無狀態特徵計算介面

另外通過觸發機制來觸發特徵計算層的執行,目前支援的觸發機制主要有:

12.png

業務落地

目前在位元組推薦場景,新一代特徵架構已經在抖音直播、電商、推送、抖音推薦等場景陸續上線了一些實時特徵。主要是有狀態型別的特徵,帶有視窗的一維統計型別、二維倒排拉鍊型別、二維 TOPK 型別、實時 CTR/CVR Rate 型別特徵、序列型別特徵等。

在業務核心指標達成方面成效顯著。在直播場景,依託新特徵架構強大的表達能力上線了一批特徵之後,業務看播核心指標、互動指標收益非常顯著。在電商場景,基於新特徵架構上線了 400+ 實時特徵。其中在直播電商方面,業務核心 GMV、下單率指標收益顯著。在抖音推送場景,基於新特徵架構離線狀態的儲存能力,聚合使用者行為資料然後寫入下游各路儲存,極大地緩解了業務下游資料庫的壓力,在一些場景中 QPS 可以下降到之前的 10% 左右。此外,抖音推薦 Feed、評論等業務都在基於新特徵架構重構原有的特徵體系。

值得一提的是,在電商和抖音直播場景,Flink 流式任務狀態最大已經達到 60T,而且這個量級還在不斷增大。預計不久的將來,單任務的狀態有可能會突破 100T,這對架構的穩定性是一個不小的挑戰。

效能優化

Flink State Cache

目前 Flink 提供兩類 StateBackend:基於 Heap 的 FileSystemStateBackend 和基於 RocksDB 的 RocksDBStateBackend。對於 FileSystemStateBackend,由於資料都在記憶體中,訪問速率很快,沒有額外開銷。而 RocksDBStateBackend 存在查盤、序列化 / 反序列化等額外開銷,CPU 使用量會有明顯上升。在位元組內部有大量使用 State 的作業,對於大狀態作業,通常會使用 RocksDBStateBackend 來管理本地狀態資料。RocksDB 是一個 KV 資料庫,以 LSM 的形式組織資料,在實際使用的過程中,有以下特點:

  1. 應用層和 RocksDB 的資料互動是以 Bytes 陣列的形式進行,應用層每次訪問都需要序列化 / 反序列化;
  1. 資料以追加的形式不斷寫入 RocksDB 中,RocksDB 後臺會不斷進行 Compaction 來刪除無效資料。

業務方使用 State 的場景多是 Get-Update,在使用 RocksDB 作為本地狀態儲存的過程中,出現過以下問題:

  1. 爬蟲資料導致熱 key,狀態會不斷進行更新 (Get-Update),單 KV 資料達到 5MB,而 RocksDB 追加更新的特點導致後臺在不斷進行 Flush 和 Compaction,單 Task 出現慢節點(抖音直播場景)。
  1. 電商場景作業多數為大狀態作業 (目前已上線作業狀態約 60TB),業務邏輯中會頻繁進行 State 操作。在融合 Flink State 過程中發現 CPU 的開銷和原有的基於記憶體或 abase 的實現有 40%~80% 的升高。經優化後,CPU 開銷主要集中在序列化 / 反序列化的過程中。

針對上述問題,可以通過在記憶體維護一個物件 Cache,達到優化熱點資料訪問和降低 CPU 開銷的目的。通過上述背景介紹,我們希望能為 StateBackend 提供一個通用的 Cache 功能,通過 Flink StateBackend Cache 功能設計方案達成以下目標:

  1. 減少 CPU 開銷 : 通過對熱點資料進行快取,減少和底層 StateBackend 的互動次數,達到減少序列化 / 反序列化開銷的目的。
  1. 提升 State 吞吐能力 : 通過增加 Cache 後,State 吞吐能力應比原有的 StateBackend 提供的吞吐能力更高。理論上在 Cache 足夠大的情況下,吞吐能力應和基於 Heap 的 StateBackend 近似。
  1. Cache 功能通用化 : 不同的 StateBackend 可以直接適配該 Cache 功能。目前我們主要支援 RocksDB,未來希望可以直接提供給別的 StateBackend 使用,例如 RemoteStateBackend。

經過和位元組基礎架構 Flink 團隊的合作,在實時特徵生產升級 ,上線 Cache 大部分場景的 CPU 使用率大概會有高達 50% 左右的收益;

PB IDL 裁剪

在位元組內部的實時特徵離線生成鏈路當中,我們主要依賴的資料流是 Kafka。這些 Kafka 都是通過 PB 定義的資料,欄位繁多。公司級別的大 Topic 一般會有 100+ 的欄位,但大部分的特徵生產任務只使用了其中的部分欄位。對於 Protobuf 格式的資料來源,我們可以完全通過裁剪資料流,Mask 一些非必要的欄位來節省反序列化的開銷。PB 型別的日誌,可以直接裁剪 IDL,保持必要欄位的序號不變,在反序列化的時候會跳過 Unknown Field 的解析,這 對於 CPU 來說是更節省的,但是網路頻寬不會有收益, 預計裁剪後能節省非常多的 CPU 資源。在上線了 PB IDL 裁剪之後,大部分任務的 CPU 收益在 30% 左右。

遇到的問題

新架構特徵生產任務本質就是一個有狀態的 Flink 任務,底層的狀態儲存 StateBackend 主要是本地的 RocksDB。主要面臨兩個比較難解的問題,一是任務 DAG 變化 Checkpoint 失效,二是本地儲存不能很好地支援特徵狀態歷史資料回溯。

  • 實時特徵任務不能動態新增新的特徵:對於一個線上的 Flink 實時特徵生產任務,我們不能隨意新增新的特徵。這是由於引入新的特徵會導致 Flink 任務計算的 DAG 發生改變,從而導致 Flink 任務的 Checkpoint 無法恢復,這對實時有狀態特徵生產任務來說是不能接受的。目前我們的解法是禁止更改線上部署的特徵任務配置,但這也就導致了線上生成的特徵是不能隨便下線的。對於這個問題暫時沒有找到更好的解決辦法,後期仍需不斷探索。
  • 特徵狀態冷啟動問題:目前主要的狀態儲存引擎是 RocksDB,不能很好地支援狀態資料的回溯。

後續規劃

當前新一代架構還在位元組推薦場景中快速演進,目前已較好解決了實時視窗特徵的生產問題。

出於實現統一推薦場景下特徵生產的目的,我們後續會繼續基於 Flink SQL 流批一體能力,在批式特徵生產發力。此外也會基於 Hudi 資料湖技術,完成特徵的實時入湖,高效支援模型訓練場景離線特徵回溯痛點。規則引擎方向,計劃繼續探索 CEP,推動在電商場景有更多落地實踐。在實時視窗計算方向,將繼續深入調研 Flink 原生視窗機制,以期解決目前方案面臨的視窗特徵資料退場問題。

  • 支援批式特徵:這套特徵生產方案主要是解決實時有狀態特徵的問題,而目前位元組離線場景下還有大量批式特徵是通過 Spark SQL 任務生產的。後續我們也會基於 Flink SQL 流批一體的計算能力,提供對批式場景特徵的統一支援,目前也初步有了幾個場景的落地;
  • 特徵離線入湖:基於 Hudi On Flink 支援實時特徵的離線數倉建設,主要是為了支援模型訓練樣本拼接場景離線特徵回溯;
  • Flink CEP 規則引擎支援:Flink SQL 本質上就是一種規則引擎,目前在線上我們把 Flink SQL 作為業務 DSL 過濾語義底層的執行引擎。但 Flink SQL 擅長表達的 ETL 型別的過濾規則,不能表達帶有時序型別的規則語義。在直播、電商場景的時序規則需要嘗試 Flink CEP 更加複雜的規則引擎。
  • Flink Native Windowing 機制引入:對於視窗型別的有狀態特徵,我們目前採用上文所述的抽象 SlotState 時間切片方案統一進行支援。另外 Flink 本身提供了非常完善的視窗機制,通過 Window Assigner、Window Trigger 等元件可以非常靈活地支援各種視窗語義。因此後續我們也會在視窗特徵計算場景引入 Flink 原生的 Windowing 機制,更加靈活地支援視窗特徵迭代。
  • Flink HybridState Backend 架構:目前在位元組的線上場景中,Flink 底層的 StateBackend 預設都是使用 RocksDB 儲存引擎。這種內嵌的儲存引擎不能通過外部機制去提供狀態資料的回灌和多工共享,因此我們需要支援 KV 中心化儲存方案,實現靈活的特徵狀態回溯。
  • 靜態屬性型別特徵統一管理:通過特徵平臺提供統一的 DSL 語義,統一管理其他外部靜態型別的特徵服務。例如一些其他業務團隊維度的使用者分類、標籤服務等。

火山引擎流式計算 Flink 版正在公測中,支援雲中立模式,支援公有云、混合雲及多雲部署,全面貼合企業上雲策略。歡迎申請試用:

Flink.png

掃描二維碼,瞭解更多 Flink 試用資訊 👆