位元組跳動使用 Flink State 的經驗分享

語言: CN / TW / HK

前言

Flink 作業需要藉助 State 來完成聚合、Join 等有狀態的計算任務,而 State 也一直都是作業調優的一個重點。目前 State 和 Checkpoint 已經在位元組跳動內部被廣泛使用,業務層面上 State 支援了資料整合、實時數倉、特徵計算、樣本拼接等典型場景;作業型別上支援了 Map-Only 型別的通道任務、ETL 任務,視窗聚合計算的指標統計任務,多流 Join 等儲存資料明細的資料拼接任務。

以 WordCount 為例,假設我們需要統計 60 秒視窗內 Word 出現的次數:

``` select

word,

TUMBLE_START(eventtime, INTERVAL '60' SECOND) as t,

count(1)

from

words_stream

group by

TUMBLE(eventtime, INTERVAL '60' SECOND), word

```

每個還未觸發的 60s 視窗內,每個 Word 對應的出現次數就是 Flink State,視窗每收到新的資料就會更新這個狀態直到最後輸出。為了防止作業失敗,狀態丟失,Flink 引入了分散式快照 Checkpoint 的概念,定期將 State 持久化到 Hdfs 上,如果作業 Failover,會從上一次成功的 checkpoint 恢復作業的狀態(比如 kafka 的 offset,視窗內的統計資料等)。

在不同的業務場景下,使用者往往需要對 State 和 Checkpoint 機制進行調優,來保證任務執行的效能和 Checkpoint 的穩定性。閱讀下方內容之前,我們可以回憶一下,在使用 Flink State 時是否經常會面臨以下問題:

  • 某個狀態運算元出現處理瓶頸時,加資源也沒法提高效能,不知該如何排查效能瓶頸
  • Checkpoint 經常出現執行效率慢,barrier 對齊時間長,頻繁超時的現象
  • 大作業的 Checkpoint 產生過多小檔案,對線上 HDFS 產生小檔案壓力
  • RocksDB 的引數過多,使用的時候不知該怎麼選擇
  • 作業擴縮容恢復時,恢復時間過長導致線上斷流

State 及 RocksDB 相關概念介紹

State 分類

| State 分類 | StateBackend | 儲存形式 | 使用場景 | | ------------------------ | ---------------------------- | ------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------- | | OperatorState | DefaultOperatorStateBackend | ListState/UnionState 使用 ArrayList 進行儲存;BroadcastState 使用 HashMap 進行儲存;在記憶體中進行儲存 | Non-Keyed Operator 中使用的狀態,主要儲存少量元資訊資料,通常為 KB 級別:比如 Kafka Connector 中使用 Union State 儲存 partition 和 offset | | KeyedState | HeapKeyedStateBackend | 使用 StateTable(類似 HashMap) 的結構在記憶體中儲存狀態資料 | Keyed Operator 中使用的狀態,狀態大小受限於 TaskManager 記憶體和 GC 壓力,通常為 MB 級別:比如小流量的視窗聚合等統計場景 | | | RocksDBKeyedStateBackend | 基於 RocksDB 儲存狀態資料,資料儲存於記憶體和磁碟 | Keyed Operator 中使用的狀態,狀態大小可以遠超出 TaskManager 記憶體,通常為 GB 級別;比如大流量的多流 Join、去重運算元等場景 | |

由於 OperatorState 背後的 StateBackend 只有 DefaultOperatorStateBackend,所以使用者使用時通常指定的 FsStateBackend 和 RocksDBStateBackend 兩種,實際上指定的是 KeyedState 對應的 StateBackend 型別:

  • FsStateBackend:DefaultOperatorStateBackend 和 HeapKeyedStateBackend 的組合
  • RocksDBStateBackend:DefaultOperatorStateBackend 和 RocksDBKeyedStateBackend 的組合

RocksDB 介紹

RocksDB 是嵌入式的 Key-Value 資料庫,在 Flink 中被用作 RocksDBStateBackend 的底層儲存。如下圖所示,RocksDB 持久化的 SST 檔案在本地檔案系統上通過多個層級進行組織,不同層級之間會通過非同步 Compaction 合併重複、過期和已刪除的資料。在 RocksDB 的寫入過程中,資料經過序列化後寫入到 WriteBuffer,WriteBuffer 寫滿後轉換為 Immutable Memtable 結構,再通過 RocksDB 的 flush 執行緒從記憶體 flush 到磁碟上;讀取過程中,會先嚐試從 WriteBuffer 和 Immutable Memtable 中讀取資料,如果沒有找到,則會查詢 Block Cache,如果記憶體中都沒有的話,則會按層級查詢底層的 SST 檔案,並將返回的結果所在的 Data Block 載入到 Block Cache,返回給上層應用。

圖2.png

RocksDBKeyedStateBackend 增量快照介紹

這裡介紹一下大家在大狀態場景下經常需要調優的 RocksDBKeyedStateBackend 增量快照。RocksDB 具有 append-only 特性,Flink 利用這一特性將兩次 checkpoint 之間 SST 檔案列表的差異作為狀態增量上傳到分散式檔案系統上,並通過 JobMaster 中的 SharedStateRegistry 進行狀態的註冊和過期。

圖3.png

如上圖所示,Task 進行了 3 次快照(假設作業設定保留最近 2 次 Checkpoint):

  • CP-1:RocksDB 產生 sst-1 和 sst-2 兩個檔案,Task 將檔案上傳至 DFS,JM 記錄 sst 檔案對應的引用計數
  • CP-2:RocksDB 中的 sst-1 和 sst-2 通過 compaction 生成了 sst-1,2,並且新生成了 sst-3 檔案,Task 將兩個新增的檔案上傳至 DFS,JM 記錄 sst 檔案對應的引用計數
  • CP-3:RocksDB 中新生成 sst-4 檔案,Task 將增量的 sst-4 檔案上傳至 DFS,且在 CP-3 完成後,由於只保留最近 2 次 CP,JobMaster 將 CP-1 過期,同時將 CP-1 中的 sst 檔案對應的引用計數減 1,並刪除引用計數歸 0 的 sst 檔案(sst-1 和 sst-2)

增量快照涉及到 Task 多執行緒上傳/下載增量檔案,JobMaster 引用計數統計,以及大量與分散式檔案系統的互動等過程,相對其他的 StateBackend 要更為複雜,在 100+GB 甚至 TB 級別狀態下,作業比較容易出現效能和穩定性瓶頸的問題。

State 實踐經驗

提升 State 操作效能

使用者在使用 State 時,會發現操作 State 並不是一件很"容易"的事情,如果使用 FsStateBackend,會經常遇到 GC 問題、頻繁調參等問題;如果使用 RocksDBStateBackend,涉及到磁碟讀寫,物件序列化,在缺乏相關 Metrics 的情況下又不是很容易進行效能問題的定位,或者面對 RocksDB 的大量引數不知道如何調整到最優。

目前位元組跳動內有 140+ 作業的狀態大小達到了 TB 級別,單作業的最大狀態為 60TB,在逐步支援大狀態作業的實踐中,我們積累了一些 State 的調優經驗,也做了一些引擎側的改造以支援更好的效能和降低作業調優成本。

選擇合適的 StateBackend

我們都知道 FsStateBackend 適合小狀態的作業,而 RocksDBStateBackend 適合大狀態的作業,但在實際選擇 FsStateBackend 時會遇到以下問題:

  • 進行開發之前,對狀態大小無法做一個準確的預估,或者做狀態大小預估的複雜度較高
  • 隨著業務增長,所謂的 "小狀態" 很快就變成了 "大狀態",需要人工介入做調整
  • 同樣的狀態大小,由於狀態過期時間不同,使用 FsStateBackend 產生 GC 壓力也不同

針對上面 FsStateBackend 中存在的若干個問題,可以看出 FsStateBackend 的維護成本還是相對較高的。在位元組內部,我們暫時只推薦部分作業總狀態小於 1GB 的作業使用 FsStateBackend,而對於大流量業務如短影片、直播、電商等,我們更傾向於推薦使用者使用 RocksDBStateBackend 以減少未來的 GC 風險,獲得更好的穩定性。

隨著內部硬體的更新迭代,ssd 的推廣,長遠來看我們更希望將 StateBackend 收斂到 RocksDBStateBackend 來提高作業穩定性和減少使用者運維成本;效能上期望在小狀態場景下,RocksDBStateBackend 可以和 FsStateBackend 做到比較接近或者打平。

觀測效能指標,使用火焰圖分析瓶頸

社群版本的 Flink 使用 RocksDBStateBackend 時,如果遇到效能問題,基本上是很難判斷出問題原因,此時建議開啟相關指標進行排查[1]。另外,在位元組跳動內部,造成 RocksDBStateBackend 效能瓶頸的原因較多,我們構建了一套較為完整的 RocksDB 指標體系,並在 Flink 層面上預設透出了部分關鍵的 RocksDB 指標,並新增了 State 相關指標,部分指標的示意圖如下:

造成 RocksDB 效能瓶頸的常見如下:

  • 單條記錄的 State Size 過大,由於 RocksDB 的 append-only 的特性,write buffer 很容易打滿,造成資料頻繁刷盤和 Compaction,搶佔作業 CPU
  • Operator 內部的 RocksDB 容量過大,如 Operator 所在的 RocksDB 例項大小超過 15GB 我們就會比較明顯地看到 Compaction 更加頻繁,並且造成 RocksDB 頻繁的 Write Stall
  • 硬體問題,如磁碟 IO 打滿,從 State 操作的 Latency 指標可以看出來,如果長時間停留在秒級別,說明硬體或者機器負載偏高

除了以上指標外,另外一個可以相配合的方法是火焰圖,常見方法比如使用阿里的 arthas[2]。火焰圖內部會展示 Flink 和 RocksDB 的 CPU 開銷,示意圖如下:

如上所示,可以看出火焰圖中 Compaction 開銷是佔比非常大的,定位到 Compaction 問題後,我們可以再根據 Value Size、RocksDB 容量大小、作業並行度和資源等進行進一步的分析。

使用合理的 RocksDB 引數

除了 Flink 中提供的 RocksDB 引數[3]之外,RocksDB 還有很多調優引數可供使用者使用。使用者可以通過自定義 RocksDBOptionsFactory 來做 RocksDB 的調優[4]。經過內部的一些實踐,我們列舉兩個比較有效的引數:

  • 關閉 RocksDB 的 compression(需要自定義 RocksDBOptionsFactory):RocksDB 預設使用 snappy 演算法對資料進行壓縮,由於 RocksDB 的讀寫、Compaction 都存在壓縮的相關操作,所以在對 CPU 敏感的作業中,可以通過ColumnFamilyOptions.setCompressionType(CompressionType.NO_COMPRESSION) 將壓縮關閉,採用磁碟空間容量換 CPU 的方式來減少 CPU 的損耗
  • 開啟 RocksDB 的 bloom-filter(需要自定義 RocksDBOptionsFactory):RocksDB 預設不使用 bloom-filter[5],開啟 bloom-filter 後可以節省一部分 RocksDB 的讀開銷
  • 其他 cache、writebuffer 和 flush/compaction 執行緒數的調整,同樣可以在不同場景下獲得不同的收益,比如在寫少多讀的場景下,我們可以通過調大 Cache 來減少磁碟 IO

這裡要注意一點,由於很多引數都以記憶體或磁碟來換取效能上的提高,所以以上引數的使用需要結合具體的效能瓶頸分析才能達到最好的效果,比如在上方的 火焰圖 中可以明顯地看到 snappy 的壓縮佔了較大的 CPU 開銷,此時可以嘗試 compression 相關的引數。

關注 RocksDBStateBackend 的序列化開銷

使用 RocksDB State 的相關 API,Key 和 Value 都是需要經過序列化和反序列化,如果 Java 物件較複雜,並且使用者沒有自定義 Serializer,那麼它的序列化開銷也會相對較大。比如去重操作中常用的 RoaringBitmap,在序列化和反序列化時,MB 級別的物件的序列化開銷達到秒級別,這對於作業效能是非常大的損耗。因此對於複雜物件,我們建議:

  • 業務上嘗試在 State 中使用更精簡的資料結構,去除不需要儲存的欄位
  • StateDescriptor 中通過自定義 Serializer 來減小序列化開銷
  • 在 KryoSerializer 顯式註冊 PB/Thrift Serializer[6]
  • 減小 State 的操作次數,比如下方的示例程式碼,如果是使用 FsStateBackend ,則沒有太多效能損耗;但是在 RocksDBStateBackend 上因為兩次 State 的操作導致 userKey 產生了額外一次序列化的開銷,如果 userKey 本身是個相對複雜的物件就要注意了

``` if (mapState.contains(userKey)) {

UV userValue = mapState.get(userKey);

} ```

更多關於序列化的效能和指導可以參考社群的調優文件[7]。

構建 RocksDB State 的快取

上面提到 RocksDB 的序列化開銷可能會比較大,位元組跳動內部在 StateBackend 和 Operator 中間構建了 StateBackend Cache Layer,負責快取運算元內部的熱點資料,並且根據 GC 情況進行動態擴縮容,對於有熱點的作業收益明顯。

同樣,對於使用者而言,如果作業熱點明顯的話,可以嘗試在記憶體中構建一個簡單的 Java 物件的快取,但是需要注意以下幾點:

  • 控制快取的閾值,防止快取物件過多造成 GC 壓力過大
  • 注意快取中 State TTL 邏輯處理,防止出現髒讀的情況

降低 Checkpoint 耗時

Checkpoint 持續時間和很多因素相關,比如作業反壓、資源是否足夠等,在這裡我們從 StateBackend 的角度來看看如何提高 Checkpoint 的成功率。一次 Task 級別的快照可以劃分為以下幾個步驟:

圖6.png

  • 等待 checkpointLock:Source Task 中,觸發 Checkpoint 的 Rpc 執行緒需要等待 Task 執行緒完成當前資料處理後,釋放 checkpointLock 後才能觸發 checkpoint,這一步的耗時主要取決於使用者的處理邏輯及每條資料的處理時延
  • 收集 Barrier: 非 Source 的 Task 中,這一步是將上游所有 Task 傳送的 checkpoint barrier 收集齊,這一步的耗時主要在 barrier 在 buffer 佇列中的排隊時間
  • 同步階段:執行使用者自定義的 snapshot 方法以及 StateBackend 上的元資訊快照,比如 FsStateBackend 在同步階段會對記憶體中的狀態結構做淺拷貝
  • 非同步階段:將狀態資料或檔案上傳到 DFS

位元組跳動內部,我們也針對這四個步驟構建了相關的監控看板:

生產環境中,「等待 checkpointLock」和「同步階段」更多是在業務邏輯上的耗時,通常耗時也會相對較短;從 StateBackend 的層面上,我們可以對「收集 Barrier」和「非同步階段」這兩個階段進行優化來降低 Checkpoint 的時長。

減少 Barrier 對齊時間

減少 Barrier 對齊時間的核心是降低 in-flight 的 Buffer 總大小,即使是使用社群的 Unaligned Checkpoint 特性,如果 in-flight 的 Buffer 數量過多,會導致最後寫入到分散式儲存的狀態過大,有時候 in-flight 的 Buffer 大小甚至可能超過 State 本身的大小,反而會對非同步階段的耗時產生負面影響。

  • 降低 channel 中 Buffer 的數量:Flink 1.11 版本支援在資料傾斜的環境下限制單個 channel 的最大 Buffer 數量,可以通過 taskmanager.network.memory.max-buffers-per-channel 引數進行調整
  • 降低單個 Buffer 的大小:如果單條資料 Size 在 KB 級別以下,我們可以通過降低 taskmanager.memory.segment-size 來減少單個 Buffer 的大小,從而減少 Barrier 的排隊時間

結合業務場景降低 DFS 壓力

如果在你的叢集中,所有 Flink 作業都使用同一個 DFS 叢集,那麼業務增長到一定量級後,DFS 的 IO 壓力和吞吐量會成為「非同步階段」中非常重要的一個參考指標。尤其是在 RocksDBStateBackend 的增量快照中,每個 Operator 產生的狀態檔案會上傳到 DFS中,上傳檔案的數量和作業並行度、作業狀態大小呈正比。而在 Flink 並行度較高的作業中,由於各個 Task 的快照基本都在同一時間發生,所以幾分鐘內,對 DFS 的寫請求數往往能夠達到幾千甚至上萬。

  • 合理設定 state.backend.fs.memory-threshold 減小 DFS 檔案數量:此引數表示生成 DFS 檔案的最小閾值,小於此閾值的狀態會以 byte[] 的形式封裝在 RPC 請求內傳給 JobMaster 並持久化在 _metadata 裡)。

    • 對於 Map-Only 型別的任務,通常狀態中儲存的是元資訊相關的內容(如 Kafka 的消費位移),狀態相對較小,我們可以通過調大此引數避免將這些狀態落盤。Flink 1.11 版本之前,state.backend.fs.memory-threshold 預設的 1kb 閾值較小,比較容易地導致每個並行度都需要上傳自己的狀態檔案,上傳檔案個數和並行度成正比。我們可以結合業務場景調整此引數,將 DFS 的請求數從 N(N=並行度) 次優化到 1 次
    • 這裡需要注意,如果閾值設定過高(MB級別),可能會導致 _metadata 過大,從而增大 JobMaster 恢復 Checkpoint 元資訊 和部署 Task 時的 GC 壓力,導致 JobMaster 頻繁 Full GC
  • 合理設定 state.backend.rocksdb.checkpoint.transfer.thread.num 執行緒數減少 DFS 壓力:此引數表示製作快照時上傳和恢復快照時下載 RocksDB 狀態檔案的執行緒數。

    • 在狀態較大的情況下,使用者為了提高 Checkpoint 效率,可能會將此執行緒數設定的比較大,比如超過 10,在這種情況下快照製作和快照恢復都會給 DFS 帶來非常大的瞬時壓力,尤其是對 HDFS NameNode,很有可能瞬間佔滿 NameNode 的請求資源,影響其他正在執行的作業
  • 調大 state.backend.rocksdb.writebuffer.size:此引數表示 RocksDB flush 到磁碟之前,在記憶體中儲存的資料大小。

    • 如果作業的吞吐比較高,Update 比較頻繁,造成了 RocksDB 目錄下的檔案過多,通過調大此引數可以一定程度上通過加大檔案大小來減少上傳的檔案數量,減少 DFS IO 次數。

合併 RocksDBKeyedStateBackend 上傳的檔案(FLINK-11937)

在社群版本的增量快照中,RocksDB 新生成的每個 SST 檔案都需要上傳到 DFS,以 HDFS 為例,HDFS 的預設 Block 大小通常在 100+MB(位元組跳動內部是 512MB),而 RocksDB 生成的檔案通常為 100MB 以下,對於小資料量的任務甚至是 KB 級別的檔案大小,Checkpoint 產生的大量且頻繁的小檔案請求,對於 HDFS 的元資料管理和 NameNode 訪問都會產生比較大的壓力。

社群在 FLINK-11937 中提出了將小檔案合併上傳的思路,類似的,在位元組內部的實現中,我們將小檔案合併的邏輯抽象成 Strategy,這樣我們可以根據 SST 檔案數量、大小、存活時長等因素實現符合我們自己業務場景的上傳策略。

提高 StateBackend 恢復速度

除了 State 效能以及 DFS 瓶頸之外,StateBackend 的恢復速度也是實際生產過程中考慮的一個很重要的點,我們在生產過程中會發現,由於某些引數的設定不合理,改變作業配置和併發度會導致作業在重啟時,從快照恢復時效能特別差,恢復時間長達十分鐘以上。

謹慎使用 Union State

Union State 的特點是在作業恢復時,每個並行度恢復的狀態是所有並行度狀態的並集,這種特性導致 Union State 在 JobMaster 狀態分配和 TaskManager 狀態恢復上都比較重:

  • JobMaster 需要完成一個 NN 的遍歷,將每個並行度的狀態都賦值成所有並行度狀態的並集。(這裡實際上可以使用 HashMap 將遍歷優化成 N1 的複雜度[8])
  • TaskManager 需要讀取全量 Union State 的狀態檔案,比如 1000 並行度的作業在恢復時,每個並行度中的 Union State 在恢復狀態時都需要讀取 1000 個並行度 Operator 所產生的狀態檔案,這個操作是非常低效的。(我們內部的優化是將 Union State 狀態在 JobMaster 端聚合成 1 個檔案,這樣 TaskManager 在恢復時只需要讀取一個檔案即可)

Union State 在實際使用中,除恢復速度慢的問題外,如果使用不當,對於 DFS 也會產生大量的壓力,所以建議在高並行度的作業中,儘量避免使用 Union State 以降低額外的運維負擔。

增量快照 vs 全量快照恢復

RocksDBStateBackend 中支援的增量快照和全量快照(或 Savepoint),這兩種快照的差異導致了它們在不同場景下的恢復速度也不同。其中增量快照是將 RocksDB 底層的增量 SST 檔案上傳到 DFS;而全量快照是遍歷 RocksDB 例項的 Key-Value 並寫入到 DFS。

以是否擴縮容來界定場景,這兩種快照下的恢復速度如下:

| | 非擴縮容場景 | 擴縮容場景 | | -------------------- | ---------- | --------- | | 增量快照 | 快 | 慢 | | 全量快照 / Savepoint | 中等 | 中等 |

  • 非擴縮容場景:

    • 增量快照的恢復只需將 SST 檔案拉到本地即可完成 RocksDB 的初始化 (多執行緒)
    • 全量快照的恢復需要遍歷屬於當前 Subtask 的 KeyGroup Range 下的所有鍵值對,寫入到本地磁碟並完成 RocksDB 初始化 (單執行緒)
  • 擴縮容場景:

    • 增量快照的恢復涉及到多組 RocksDB 的資料合併,涉及到多組 RocksDB 檔案的下載以及寫入到同一個 RocksDB 中產生的大量 Compaction,Compaction 過程中會產生嚴重的寫放大
    • 全量快照的恢復和上面的非擴縮容場景一致 (單執行緒)

這裡比較麻煩的一點是擴縮容恢復時比較容易遇到長尾問題,由於單個並行度狀態過大而導致整體恢復時間被拉長,目前在社群版本下還沒有比較徹底的解決辦法,我們也在針對大狀態的作業進行恢復速度的優化,在這裡基於社群已支援的功能,在擴縮容場景下給出一些加快恢復速度的建議:

  • 擴縮容恢復時儘量選擇從 Savepoint 進行恢復,可以避免增量快照下多組 Task 的 RocksDB 例項合併產生的 Compaction 開銷
  • 調整 RocksDB 相關引數,調大 WriteBuffer 大小和 Flush/Compaction 執行緒數,增強 RocksDB 批量將資料刷盤的能力

| 引數 | 含義 | 預設值 | 建議值 | | ------------------------------------------- | ---------------------------------------------------------------------------------------------------------- | ---- | --------------------- | | state.backend.rocksdb.writebuffer.count | RocksDB儲存在記憶體中的memtable數量,提高這個值可以增加RocksDB的讀寫效能 | 2 | 4,記憶體充足可設定為6或8 | | state.backend.rocksdb.writebuffer.size | RocksDB儲存在記憶體中的單個memtable大小,達到配置值後轉為immutable memtable。提高這個值可以增加RocksDB的寫效能 | 64MB | 128M,記憶體充足時可設定為256M | | state.backend.rocksdb.flush.thread.num | RocksDB後臺用於將immutable memtable flush到磁碟的執行緒數,當immutable不能及時刷到磁碟時,會降低rocksdb的write速率。提高這個執行緒數可以提高rocksdb的寫入效能 | 1 | writebuffer.count - 1 | | state.backend.rocksdb.compaction.thread.num | RocksDB後臺會不斷進行compact操作,來對過期、重複的資料進行清理,從而減少磁碟檔案的大小。 | 1 | 2 ~ 4 |

總結

本篇文章中,我們介紹了 State 和 RocksDB 的相關概念,並針對位元組跳動內部在 State 應用上遇到的問題,給出了相關實踐的建議,希望大家在閱讀本篇文章之後,對於 Flink State 在日常開發工作中的應用,會有更加深入的認識和了解。

目前,位元組跳動流式計算團隊同步支援的火山引擎流式計算 Flink 版正在公測中,支援雲中立模式,支援公共雲、混合雲及多雲部署,全面貼合企業上雲策略,歡迎申請試用: