打造通用快取層:位元組跳動 Flink StateBackend 效能提升之路

語言: CN / TW / HK

內容簡介:StateBackend 作為 Flink 向上提供 State 能力的基石,其效能會嚴重影響任務的吞吐。本次分享主要介紹在位元組跳動內部通過為 StateBackend 提供通用快取層,來提高效能的相關優化。

作者|位元組跳動基礎架構研發工程師-李明

一、相關背景

StateBackend 是 Flink 向上提供 State 能力的基石,其效能會嚴重影響任務的吞吐。目前 Flink 提供的生產可用的 Statebackend 主要有兩類,一類是 FsStateBackend,另一類是 RocksDBStateBackend。他們的基本原理都是提供一個 State API 給使用者使用,底層會根據 StateBackend 型別選用不同的儲存來儲存資料。

01.png

FsStateBackend 底層實現是在記憶體中通過 Map 的資料結構來儲存資料,把原始的資料物件直接儲存到記憶體中。這種 StateBackend 的優點是訪問速度特別快,所有操作都是在記憶體中進行,基本沒有額外的 CPU 開銷。缺點是隨著狀態規模的增長,JVM 的 GC 停頓時間也會越來越長,同時狀態規模會受到記憶體的限制。

RocksDBStateBackend 底層選用了 RocksDB 來儲存資料,儲存的狀態規模理論上受限於磁碟,序列化後的結果也會比以 Object 的形式存在記憶體中要小,因此支撐的狀態規模比 FsStateBackend大。另外,RocksDBStateBackend 在 JVM 的 Heap 中沒有額外的狀態資料儲存,對應的 GC 壓力非常低。但是都是以二進位制的形式與 RocksDBStateBackend 互動,這意味著每一次 State 訪問都需要將資料進行序列化/反序列化,會帶來一些額外的 CPU 開銷。

02.png

我們在線上使用這兩種 StateBackend 也遇到了不少痛點:

  1. 線上 SQL 作業的狀態相對比較小,因此會預設配置使用 FsStateBackend 。但是隨著狀態的規模提升,GC 的停頓時間會越來越長,業務開始對這種停頓產生感知。
  2. 在單 Task 的狀態比較大時,一般推薦使用 RocksDBStateBackend,由於 State 操作都是隨機 IO 型別,在非 SSD 機器上的訪問效能比較差,並且在訪問過程中存在額外的序列化和反序列化開銷, CPU 的使用量也會明顯上升,實際使用的資源成本增加了。
  3. 業務在 StateBackend 的選型上比較困難。業務很難預估未來任務狀態規模會有多大,如果發現狀態規模比較大了,需要一些額外的運維操作進行 StateBackend 切換,比如需要製作一個 Savepoint,再從 Savepoint 去進行恢復,這會帶來額外的運維工作。

03.png

因此我們思考是否可以將 FsStateBackend CPU 開銷低和 RocksDBStateBackend 容量大的優點結合起來,解決前文提到的痛點。

社群之前提出了 SpillableStateBackend 設計思想,它是一個 Anti-Caching 的架構,通過記憶體+磁碟進行資料儲存,在執行過程中會根據當前的GC情況,以 KeyGroup 的粒度動態地與磁碟上的資料進行交換,來調整記憶體佔用。

當 Task 狀態特別大的時候,大部分資料會被交換到磁碟上,訪問效能會有較大下降,因此還是以支援小規模狀態為主。另一方面,資料交換的粒度是比較粗的,假如單 Task 的 State 是 1G,分配了5個 KeyGroup,平均1個 KeyGroup 大概是 200M,它會以 200M 的粒度去控制記憶體中需要放入哪些 KeyGroup。而在實際場景中,單 Task 的狀態可能會達到 GB 級,State 的訪問可能隨機到很多 KeyGroup 中。SpillableStateBackend 的設計思想並不能把熱點資料充分儲存到記憶體中,因此不適用於我們的場景。

04.png

Flink SQL 場景中提出了 MiniBatch 的設計思想。 在原來的 SQL 作業流式處理過程中,每來一條資料都會立刻訪問 State,然後產生 Read-Update 操作。增加了 MiniBatch 之後,到來的資料並不會立刻訪問 State,而是先儲存在記憶體中,當這個 Batch 攢夠或者到達設定的等待時間以後,相同 Key 的資料會一起訪問 State,操作完成以後再把 Key 對應的 State 寫回到 Statebackend,從而減少了 State 訪問的次數。MiniBatch 的實現是以 Operator 的粒度進行,如果有新的 Operator 需要利用這個機制,還需要做額外的開發。

此外,業務實踐中也在訪問 State 上做了一些優化,通過在記憶體中增加物件快取,減少序列化和反序列化的開銷。實現上也是以 Operator 為粒度,優化後 CPU 使用有了比較明顯的下降。

二、優化思路

優化思路

前文提到的優化思路更多的是面向小狀態,或者單 Operator 來獨立設計的。但我們希望能有一個統一的優化方案,讓未來所有 Operator 都能自動使用到這樣的優化。

05.png

我們的狀態使用訴求比較高的核心業務場景主要是面向樣本拼接或者流式消重

樣本拼接場景 推薦系統會向用戶推送訊息,比如一批好友、影片等,這些資料會作為推薦資料流進行輸入。使用者看到這些推薦訊息後會產生一些操作,比如加好友、檢視影片、刪除影片等,這些行為會被作為使用者的行為資料流輸入。對這兩條資料流進行資料拼接之後,傳送到下游進行模型訓練,推薦系統會根據模型訓練結果去做一些更優的推薦。

流式消重與樣本拼接類似,推薦系統向用戶推薦影片文章等,使用者可能會觀看其中的一部分影片,系統會對已經觀看的影片進行消重處理,下次推薦的影片中就不會再有已看過的影片。

在上述兩種業務場景中,資料特點比較明顯

  • 首先資料結構大部分是 PB 型別,單個 KV 是幾十 KB 到上百 KB 的量級,序列化和反序列化的開銷也比較高。
  • 第二個特點是狀態的規模比較大,因為它面向的使用者數特別多,State 中一般會儲存每一個使用者操作的明細資料,單個 Task 的狀態規模一般會達到 GB 級。
  • 第三個特點是使用者的行為資料大部分會在同一個時間段到來。

針對上述特點,我們的優化思路是將同一個 Key 對應多條資料的模式看作對於熱點資料的訪問,把這些熱點資料進行快取,這樣就可以減少序列化和反序列化的開銷。

06.png

第二個優化思路是業務運維需求來的。 首先業務希望降低 StateBackend 的選型難度,不希望在未來頻繁切換 StateBackend。其次,快取功能應該是可擴充套件到多種 StateBackend 的,降低開發成本。

最終我們沒有把 StateBackend 的快取功能設計為一個獨立的 StateBackend,而是通過在 StateBackend State API 之間抽象出 Cache Layer,在這一層做了熱點資料快取。 未來底層的 StateBackend 不管怎麼變化,對於我們來說都不需要額外的調整。

快取層架構

下圖是優化方案的整體架構,快取層被拆分成了三部分。

07.png

  1. 第一部分是 CacheManager,它是 TaskManager 中的一個常駐服務,負責管理 TaskManager 中所有 Cache 的註冊和釋放等操作,同時也會監控 Cache 的使用情況。
  2. 第二部分是 StateBackend 封裝成了 CacheKeyedStateBackend,這一層封裝只是對 StateBackend 的請求做了代理,主要負責底層 StateBackend的State 建立和註冊 Cache,將 State 和 Cache 封裝為 CacheKeyedState。
  3. 第三部分是比較重要的 CacheKeyedState,它是面向使用者提供的 State 物件,實際是 Cache 和底層 StateBackend 提供的 State 包裝,訪問的時候會優先操作 Cache 中的資料,如果沒有再將底層提供的 State 中的資料載入到 Cache 中,然後進行操作。

08.png

CacheManager 在模組設計上主要參考了 SpillableStateBackend 的設計思路,分為以下4個部分:

  1. 記憶體管理模組(MemoryManager):Cache 在初始化或者擴縮容時,都需要向 MemoryManager 發起記憶體請求。這個模組主要是保證 TM 中的 Cache 記憶體不會超過配置。
  2. Cache 監控模組(CacheStatusMonitor):收集 TM 中所有 Cache 的執行情況,比如命中率、記憶體使用,這些資訊會被用來進行動態擴縮容。
  3. 記憶體監控模組(HeapStatusManager):週期性地收集 JVM 的記憶體使用情況,然後觸發 Cache 擴縮容。
  4. 動態擴縮容模組(ScalingManager):負責調整 TM 中所有 Cache 使用的記憶體大小,以達到優化記憶體使用、降低 GC 的目的。

09.png

CachedKeyedState 是由 Cache State 封裝而成, 它也分為4個部分。

  1. 第一部分是封裝的 Cache,通過把熱點資料快取到 Cache,實現減少序列化/反序列化開銷的目的。
  2. 第二部分是記憶體預估模組 (MemoryEstimator),根據當前 State 訪問使用的 KV 的資訊進行狀態大小預估,用於估算當前 Cache 佔用的記憶體大小。
  3. 第三部分是 PolicyStats,用於統計單個 Cache 的訪問資訊,並作為指標上報。使用者可以根據這些指標進行 Cache 執行情況監控,根據實際情況調整 Cache 策略。
  4. 最後是 DataSynchronizer 模組,負責 Cache 和底層 StateBackend 的資料同步。這裡沒有由 Cache 直接去操作底層資料,主要是希望可以在這一層做一些優化,比如以 Batch 的模式去寫入。

三、難點&方案

在上述方案的實現過程中,我們遇到了不少難點。

10.png

第一個難點是如何適配不同的業務場景。 這裡的目標是提高快取命中率,命中率越高優化效果就越好。但是在不同的業務場景中,因為業務資料自身的特點,快取的策略可能是不同的。

第二個難點是如何正確進行記憶體管理。 如果記憶體管理不正確,那麼開啟快取後可能會出現記憶體溢位或記憶體洩露,導致任務執行的穩定性降低。

第三個難點是如何自動調整 Cache 分配的記憶體。 如果 Cache 分配的記憶體是固定的,會導致空間上的浪費;另一方面,使用者的使用門檻也會變高,使用者很難評估 Cache 的大小。因此這裡需要記憶體使用可以自動調整,降低 JVM 的 GC 壓力。

最後一個難點是快照製作。 引入了 Cache 之後的快照製作時間會變長,如何降低快照製作的時間?

適配不同業務場景

針對適配不同業務場景的問題,我們把 Cache 的實現拆分成了兩個部分,將 CacheStrategy 設計成可插拔的快取策略,將資料儲存抽象成一個單獨的 StateStore,由 CacheStrategy 來管理 StateStore 中的資料。

StateStore 中的資料是提前內建的資料結構,使用者不需要關心這些資料是如何儲存和組織,可以根據不同業務場景去選擇快取策略,也支援自定義快取策略。

11.png

記憶體管理

針對記憶體管理的問題,主要包含兩個方面。

首先是控制 TaskManager 中所有 Cache 使用的記憶體不超過一個最大配置值, 目前使用者在使用這個功能的時候需要給定一個允許 Cache 使用的記憶體總量最大值,這部分功能由 MemoryManager 來實現。

12.png

在 MemoryManager 中,為了方便記憶體管理,記憶體以 Block 的粒度進行了劃分,在 Cache 做擴縮容或進行註冊、釋放操作的時候,都會通過 MemoryManager 去進行 Cache 的記憶體分配和回收,最終保證 Cache 使用的總記憶體不會超過使用者配置的最大值。

13.png

當 TaskManager 中 Cache 使用的總記憶體有了保證之後,第二部分就是需要保證單個 Cache 使用的記憶體不會超過分配給它的記憶體, 這項功能由三個子模組來協同配合完成。

首先,訪問 Cache 的 State 時,會通過 MemoryEstimator 模組進行狀態大小的抽樣預估,主要是根據當前訪問的 KV 進行序列化操作,根據序列化結果再結合過去一段時間內訪問資訊的平均值,作為當前 Cache 中資料的平均大小,並將其上報到 PolicyStats 中。根據 Cache 資料平均大小以及當前 Cache 中的資料條數,就可以衡量出它的記憶體佔用。

訪問的過程中會根據當前 Cache 的記憶體大小做記憶體檢查,如果當前記憶體已經溢位,則會根據快取策略去增量清理一些資料。這裡的增量清理是指清理時並不會一次性把所有溢位的資料全部清理掉,而是每次只清理 2~3 條資料,因為在 GC 壓力很高的場景中下會觸發 Cache 縮容來保證的 GC 能夠快速恢復正常。如果一次性全部清理掉,服務的停頓時間比較長,業務會產生一些抖動。

動態擴縮容

針對動態調整 Cache 記憶體使用。我們把它拆分成三個子問題。

第一個問題是什麼時候觸發動態擴縮容? Cache 使用的是 JVM 的 Heap 記憶體,擴縮容操作需要根據 JVM 的 Heap 使用情況來決定。因此會根據 JVM 最近一段時間的 GC 情況來衡量 Heap 的記憶體資源是否比較緊張。

14.png

我們抽象出一個 HeapStatusMonitor 模組,它會週期性地收集 JVM 的 GC 資訊。啟動的時候在 JVM 中註冊一個 GcNotificationListener 監聽器,JVM 發生 GC 時會通過 Listener 回撥給 HeapStatusMonitor。根據回撥資訊評估本次 GC 的耗時以及回收的記憶體、GC 後剩餘的記憶體等。有了GC 結果之後,HeapStatusMonitor 會將這些資訊彙總並上報給 ScalingManager,由 ScalingManager 來決定是否要進行擴縮容。

目前的判斷條件是 GC 頻率和 GC 耗時:我們設定了兩個閾值,超過閾值則會觸發縮容。如果 GC 後剩餘的記憶體比較多,說明大部分都是生命週期比較短的資料,當 GC 後使用的記憶體低於閾值則進行擴容。判斷時會優先判斷縮容條件,以降低記憶體的壓力。

第二個問題是本次擴縮容的大小。 主要由 ScalingManager 向 MemoryManager 發起 Scale 記憶體的請求。MemoryManager 會根據當前 Cache 記憶體的使用情況,按照一定比例來計算應該擴縮容的 Block數。

15.png

第三個問題是需要選擇哪些 Cache 擴縮容。 在實際場景中,由於不同的 State 例項狀態大小是不同的,因此我們對 TaskManager 中所有 Cache 進行了權重排序,最終挑選出 2~3 個權重比較高的 Cache 來進行擴縮容。

16.png

第一個權重欄位是從 Statebackend Load 成功的次數, 這裡沒有用請求次數的原因主要有兩個:第一,在實際的業務場景中可能會經常去訪問一些不存在的 State,導致 Cache 的命中率比較低;第二,有一些 State 雖然請求次數特別高,但是 State 的量比較小。而 Load 成功的次數更能表明當前有多少資料儲存在外存中並且需要被 Load 到記憶體。

第二個欄位是當前 Cache 分配的記憶體大小,Cache 越大,越不應該被繼續擴容,相反在縮容的時候應該優先挑選它。

此外,我們參考 SpillableStateBackend 的權重計算方式對欄位進行了歸一化處理, 主要目的是消除量綱的影響。

快照製作

針對快照製作最初有兩個實現方案,一是同步階段 Flush 所有資料到底層的 StateBackend,實現比較簡單但同步階段的耗時會升高;第二個方案是類似於 FsStateBackend 的方式,利用 CopyOnWrite 的 Map 去進行非同步快照,實現成本比較高。我們暫時選擇了方案一來實現。

17.png

經過線上測試和驗證,Cache 在分配 64MB 資料的時候,同步階段的耗時在一秒鐘之內。但是業務會根據實際場景去調整一些配置,比如把 Cache 的記憶體配置升高,因此我們還設計了一個高低水位線機制 把 Cache 中的資料劃分成了三個部分,第一部分是經常修改的熱點資料,第二部分是發生了修改但相對不那麼頻繁的資料,最後一部分是經常訪問但並未進行修改或修改後已經同步到了底層的資料,並將這三部分的分界線作為高低水位線的劃分區間。

當 Cache 中修改的資料量超過低水位線時,只會在 Cache 發生 Evict 事件時去做同步操作,將若干條資料同步到底層的 StateBackend 中;超過高水位線時,意味著當前更新的資料在 Cache 中佔比較高,需要保證同步的資料能夠快速下降,因此每次訪問 Cache 時都會同步若干條資料到底層儲存中,直到降低到低水位線。這樣,在快照製作的同步階段就可以保證 Cache 中的待同步資料是低於高水位線的,從而控制快照製作的同步耗時。

四、業務收益

接下來介紹一下方案上線以後取得的一些業務收益。

18.png

首先是 Benchmark 的對比。 這裡直接使用了社群的 Benchmark 預設配置,Cache 設定的記憶體為 64M。優化的效果比較明顯,在 State 點查操作的效能上可以提升三倍以上。

19.png

在線上任務中,我們對比了樣本拼接和流式消重兩個場景, CPU 的使用量基本上都下降50%以上,而記憶體使用上升了大概40%~50%,主要是因為存了更多資料到記憶體中,同時 JVM 需要依賴 GC 來回收這些資料,因此平均記憶體使用有比較明顯的上升。但整體成本是下降的,因為 CPU 資源比記憶體資源的成本比大約是1:9,換算成總體成本,大概下降了20%~40%。

五、未來規劃

20.png

我們的未來規劃主要集中在以下四個方面:

第一,結合業務場景去探索一些更好的 Cache 快取策略。

第二,提升 StateBackend 的恢復效能。 目前在超大狀態的恢復效能上,主要還是沿用了社群原有的恢復機制,不管是 Savepoint 還是擴縮容場景,在 Task 狀態比較大的時候,做一次恢復可能會耗時10分鐘甚至20分鐘。因此我們希望能夠在恢復機制上做一些優化,提升恢復效能。

第三,優化流式計算場景中 Compaction 的策略。單 Task 狀態很大尤其是達到 GB 級以上時,隨著 Checkpoint 的製作,它會呈現出一個週期性 CPU 業務毛刺。檢視 RocksDB 日誌之後,發現是由於觸發了多次 Compaction 導致的,因此我們希望做一些流式計算場景 Compaction 策略探索。

最後,探索新的 StatsBackend。 目前我們主要使用的還是社群提供的 StatsBackend,更適合於狀態規模、併發規模相對來講不是特別大的場景。在實際使用過程中也遇到了很多問題,比如說 HDFS 的請求數過高、磁碟 IO 打滿、依賴 SSD 磁碟等。因此我們希望能夠探索一些新的 StatsBackend,比如 RemoteStatsBackend,把資料儲存到分散式儲存中,降低本地磁碟的依賴,同時恢復的時候也不需要把資料全量拉取到本地,從而提高快照製作以及狀態恢復的速度。

目前,位元組跳動流式計算團隊同步支援的火山引擎流式計算 Flink 版正在公測中,支援雲中立模式,支援公共雲、  混合雲 及多雲部署,全面貼合企業上雲策略,歡迎申請試用:https://www.volcengine.com/product/flink

 

獲取更多產品資訊,歡迎關注公眾號【位元組跳動雲原生計算】