JRC Flink流作業調優指南

語言: CN / TW / HK

作者:京東物流 康琪

本文綜合Apache Flink原理與京東實時計算平台(JRC)的背景,詳細講述了大規模Flink流作業的調優方法。通過閲讀本文,讀者可瞭解Flink流作業的通用調優措施,並應用於生產環境。

寫在前面

Apache Flink作為Google Dataflow Model的工業級實現,經過多年的發展,如今已經成為流式計算開源領域的事實標準。它具有高吞吐、低時延、原生流批一體、高一致性、高可用性、高伸縮性的特徵,同時提供豐富的層級化API、時間窗口、狀態化計算等語義,方便用户快速入門實時開發,構建實時計算體系。

古語有云,工欲善其事,必先利其器。要想讓大規模、大流量的Flink作業高效運行,就必然要進行調優,並且理解其背後的原理。本文是筆者根據過往經驗以及調優實踐,結合京東實時計算平台(JRC)背景產出的面向專業人員的Flink流作業調優指南。主要包含以下四個方面:

  • TaskManager內存模型調優
  • 網絡棧調優
  • RocksDB與狀態調優
  • 其他調優項

本文基於Flink 1.12版本。閲讀之前,建議讀者對Flink基礎組件、編程模型和運行時有較深入的瞭解。

01 TaskManager內存模型調優

1.1 TaskManager內存模型與參數

目前的Flink TaskManager內存模型是1.10版本確定下來的,官方文檔中給出的圖示如下。在高版本Flink的Web UI中,也可以看到這張圖。

圖1 TaskManager內存模型

下面來看圖説話,分區域給出比官方文檔詳細一些的介紹。t.m.即為taskmanager. memory.前綴的縮寫。

1.2 平台特定參數

除了TaskManager內存模型相關的參數之外,還有一些平台提供的其他參數,列舉如下。

1.3 TM/平台參數與JVM的關係

上述參數與TaskManager JVM本身的參數有如下的對應關係:

  • -Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size
  • -Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio
  • -XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network
  • -XX: Max Metaspace Size → t. m. jvm- metaspace. size

另外,還可以通過env.java.opts.{jobmanager | taskmanager}配置項來分別設定JM和TM JVM的附加參數。

1.4 內存分配示例

下面以在生產環境某作業中運行的8C / 16G TaskManager為例,根據以上規則,手動計算各個內存分區的配額。注意有部分參數未採用默認值。

t.m.process.size = 16384
t.m.flink.size 
  = t.m.process.size * apus.memory.incontainer.available.ratio 
  = 16384 * 0.9 = 14745.6
t.m.jvm-metaspace.size 
  = [t.m.process.size - t.m.flink.size] * apus.metaspace.incutoff.ratio 
  = [16384 - 14745.6] * 0.25 = 409.6
$overhead 
  = MIN{t.m.process.size * t.m.jvm-overhead-fraction, t.m.jvm-overhead.max} 
  = MIN{16384 * 0.1, 1024} = 1024
$network 
  = MIN{t.m.flink.size * t.m.network.fraction, t.m.network.max} 
  = MIN{14745.6 * 0.3, 5120} = 4423.68
$managed 
  = t.m.flink.size * t.m.managed.fraction 
  = 14745.6 * 0.25 = 3686.4
t.m.task.off-heap.size 
  = t.m.flink.size * apus.taskmanager.memory.task.off-heap.fraction 
  = 14745.6 * 0.01 = 147.4
t.m.task.heap.size 
  = t.m.flink.size - $network - $managed - t.m.task.off-heap.size - t.m.framework.heap.size - t.m.framework.off-heap.size 
  = 14745.6 - 4423.68 - 3686.4 - 147.4 - 128 - 128 = 6232.12

與Web UI中展示的內存配額做比對,可發現完全吻合。

圖2 Web UI展示的內存分配情況

1.5 調優概覽

理解TaskManager內存模型是開展調優的大前提,進行調優的宗旨就是:合理分配,避免浪費,保證性能。下面先對比較容易出現問題的三塊區域做簡要的解説。

1.關於任務堆外內存

平台方的解釋是有些用户的作業需要這部分內存,但從Flink Runtime的角度講,主要是批作業(如Sort-Merge Shuffle過程)會積極地使用它。相對地,流作業很少涉及這一部分,除非用户代碼或用户引用的第三方庫直接操作了DirectByteBuffer或Unsafe之類。所以一般可以優先保證堆內存,即嘗試將
apus.t.m.task.off-heap.fraction再調小一些(如0.05),再觀察作業運行是否正常。

2.關於託管內存

如果使用RocksDB狀態後端,且狀態數據量較大或讀寫較頻繁,建議適當增加t.m.managed.fraction,如0.2~0.5,可配合RocksDB監控決定。如果不使用RocksDB狀態後端,可設為0,因為其他狀態後端下的本地狀態會存在TaskManager堆內存中。後文會詳細講解RocksDB相關的調優項。

3.關於網絡緩存

需要特別注意的是,網絡緩存的佔用量與並行度和作業拓撲有關,而與實際網絡流量關係不大,所以不能簡單地以作業的數據量來設置這一區域。粗略地講,對簡單拓撲,建議以默認值啟動作業,再觀察該區域的利用情況並進行調整;對複雜拓撲,建議先適當調大t.m.network.fraction和max,保證不出現IOException: Insufficient number of network buffers異常,然後再做調整。另外,請一定不要把t.m.network.min和max設成相等的值,這樣會直接忽略fraction,而這種直接的設定往往並不科學。下一節就來詳細講解Flink網絡棧的調優。

02 網絡棧調優

2.1 網絡棧和網絡緩存

圖3 Flink網絡棧

Flink的網絡棧構建在Netty的基礎之上。如上圖所示,每個TaskManager既可以是Server(發送端)也可以是Client(接收端),並且它們之間的TCP連接會被複用,以減少資源消耗。

圖中的小色塊就是網絡緩存(NetworkBuffer),它是數據傳輸的最基本單位,以直接內存的形式分配,承載序列化的StreamRecord數據,且一個Buffer的大小就等於一個MemorySegment的大小(t.m.segment-size,默認32KB)。TM中的每個Sub-task都會創建網絡緩存池(NetworkBufferPool),用於分配和回收Buffer。下面講解一下網絡緩存的分配規則。

2.2 網絡緩存分配規則

Flink流作業的執行計劃用三層DAG來表示,即:StreamGraph(邏輯計劃)→ JobGraph(優化的邏輯計劃)→ ExecutionGraph(物理計劃)。當ExecutionGraph真正被調度到TaskManager上面執行時,形成的是如下圖所示的結構。

圖4 Flink物理執行圖結構

每個Sub-task都有一套用於數據交換的組件,輸出側稱為ResultPartition(RP),輸入側稱為InputGate(IG)。另外,它們還會根據並行度和上下游的DistributionPattern(POINTWISE或ALL_TO_ALL)劃分為子塊,分別稱為ResultSubpartition(RS)和InputChannel(IC)。注意上下游RS和IC的比例是嚴格1:1的。網絡緩存就是在ResultPartition和InputGate級別分配的,具體的分配規則是:

  • #Buffer-RP = #RS + 1 && #Buffer-RS <= t.network.m.max-buffers-per-channel (10)
  • #Buffer-IG = #IC * t.network.m.buffers-per-channel (2, exclusive) + t.network.m.floating-buffers-per-gate (8, floating)

<!---->

  •  

翻譯一下:

  • 發送端RP分配的Buffer總數為RS的數量+1,且為了防止傾斜,每個RS可獲得的Buffer數不能多於taskmanager.network.memory.max-buffers-per-channel(默認值10);
  • 接收端每個IC獨享的Buffer數為taskmanager. network. memory. buffers- per- channel(默認值2),IG可額外提供的浮動Buffer數為taskmanager. network. memory. floating- buffers- per- gate(默認值8)。

多説一句,上圖這套機制也是Flink實現Credit-based流控(反壓)的基礎,想想診斷反壓時會看的**PoolUsage參數就明白了。反壓是比較基礎的話題,這裏就不再展開。

再重複上一節的那句話:網絡緩存的佔用量與並行度和作業拓撲有關,而與實際網絡流量關係不大。特別地,由於ALL_TO_ALL分佈(如Hash、Rebalance)會產生O(N^2)級別的RS和IC,所以對Buffer的需求量也就更大。當然,我們基本不可能通過用肉眼看複雜的拓撲圖來計算Buffer數,所以最好的方法是快速試錯,來看一個例子。

2.3 網絡緩存調優示例

本節以測試環境中的某作業(下稱“示例作業”)為例。

該作業有54個8C / 16G規格的TM,並行度400,運行4330個Sub-tasks,且包含大量的keyBy操作。初始設定t.m.network.fraction = 0.2 & t.m.network.max = 3GB,報IOException: Insufficient network buffers異常;再次設定t.m.network.fraction = 0.3 & t.m.network.max = 5GB,作業正常啟動,實際分配4.32GB,佔用率73%~78%之間浮動(參見之前的Web UI圖)。這個分配情況相對於原作業的fraction = 0.5 & min = max = 8GB顯然是更優的。

有的同學可能會問:空閒的Network區域內存不能挪作他用嗎?答案是否定的。在作業啟動時,Network區域的全部內存都會初始化成Buffer,並按上一節所述的配額分配到RP和IG,Web UI中Netty Shuffle Buffers → Available一欄的Buffer基本可以認為被浪費了。所以,當作業遇到瓶頸時,盲目增大網絡緩存對吞吐量有害無益。

2.4 容易忽略的緩存超時

網絡緩存在發送端被Flush到下游有三種時機:Buffer寫滿、超時時間到、遇到特殊標記(如Checkpoint Barrier)。之所以要設計緩存超時,是為了避免Buffer總是無法寫滿導致下游處理延遲。可以通過StreamExecutionEnvironment#setBufferTimeout方法或者execution.buffer-timeout參數來設置緩存超時,默認100ms,一般無需更改。

圖5 緩存的填充與發送

但是,考慮大並行度、大量ALL_TO_ALL交換的作業,數據相對分散,每個ResultSubpartition的Buffer並不會很快填滿,大量的Flush操作反而會無謂地佔用CPU。此時可以考慮適當增大緩存超時,降低Flush頻率,能夠有效降低CPU Usage。以前述作業為例,將緩存超時設為500ms,其他參數不變,穩定消費階段TM的平均CPU Usage降低了40%,效果拔羣。當然這仍是以下游延遲作為trade-off的,故時效性極敏感的作業不適用於此優化。

2.5 網絡容錯

平台採用Flink on Kubernetes的部署方式,但是Kubernetes網絡虛擬化(Calico、Flannel等)會損失網絡性能,故對於大流量或複雜作業,務必提高網絡容錯性。以下是三個相關的參數。

1.taskmanager.network.request-backoff.max

默認值10000(社區版)/ 60000(平台),表示下游InputChannel請求上游ResultSubpartition的指數退避最大時長,單位為毫秒。如果請求失敗,會拋出
PartitionNotFoundException: Partition xx@host not found,應適當調大,如240000。注意此報錯與Kafka Partition無關,切勿混淆。

2.akka.ask.timeout

默認值10s(社區版)/ 60s(平台),表示Akka Actor的Ask RPC等待返回結果的超時。如果網絡擁塞或者拓撲過於複雜,就會出現AskTimeoutException: Ask timed out on Actor akka://xx after xx ms的信息,應調大此值,如120s。注意長時間GC也可能導致此問題,留心排查。

3.heartbeat.timeout

默認值50000,表示JobManager和TaskManager之間心跳信號的發送/接收超時,單位為毫秒。與akka.ask.timeout同理,若出現TimeoutException: Heartbeat of TaskManager with id xx timed out,建議適當調大。

03 RocksDB與狀態調優

3.1 Flink中的FRocksDB

圖6 FRocksDB讀寫流程

Flink RocksDB狀態後端採用的是名為FRocksDB的分支版本,由Ververica維護。它的讀寫流程與原版基本相同,如上圖所示,MemTable和BlockCache分別就是讀寫緩存和讀緩存。特別地,由於Flink在每個Checkpoint週期都會將RocksDB的數據快照持久化到文件系統,所以不需要寫預寫日誌(WAL)。

TM中的每個Slot都擁有一個RocksDB實例,且傳統方式下每個列族(CF)都對應一套MemTable、BlockCache和SST。而在Flink作業中申請的一個StateHandle——即Runtime Context# get... State (State Descriptor) ——就對應一個取StateDescriptor名稱的列族。顯然,同一作業內StateDescriptor的名稱不能重複。

3.2 RocksDB託管內存機制

上述傳統方式有個明顯的缺點,即RocksDB的內存幾乎不受控(因為Flink並不限制用户能申請多少個StateHandle)。因此,Flink在1.10版本藉助RocksDB 5.6+提出的WriteBufferManager和LRUCache協同機制,實現了全託管的RocksDB內存管理,如下圖所示。

圖7 全託管RocksDB內存管理

託管內存機制默認啟用(state. backend. rocksdb. memory. managed = true),此時TM會將整塊Managed Memory區域作為所有RocksDB實例共用的BlockCache,並通過WriteBufferManager將MemTable的內存消耗向BlockCache記賬(即寫入只有size信息的dummy塊),從而BlockCache能夠感知到全部的內存使用並施加限制,避免OOM發生。SST索引和Bloom Filter塊則會進入BlockCache的高優先級區。需要注意,由於歷史原因以及Iterator-pinned Blocks的存在,BlockCache在少數情況下不能嚴格限制內存,故有必要配置一些JVM Overhead作為兜底。

託管內存默認在各個Slot之間平均分配,用户也可以通過
s.b.r.memory.fixed-per-slot參數來為每個Slot手動設定託管內存配額,但一般不推薦。除此之外,可調整的兩個參數如下。

  • s.b.r.memory.write-buffer-ratio:MemTable內存佔託管內存的比例,默認值0.5;

<!---->

  • s.b.r.memory.high-prio-pool-ratio:高優先級區內存佔託管內存的比例,默認值0.1。

剩餘的部分(默認0.4)就是留給數據BlockCache的配額。用户一般不需要更改它們,若作業狀態特別重讀或重寫,可適當調整,但必須先保證託管內存充足。

3.3 其他RocksDB參數

**
1.s.b.r.checkpoint.transfer.thread.num(默認1)**

每個有狀態算子在Checkpoint時傳輸數據的線程數,增大此值會對網絡和磁盤吞吐量有更高要求。一般建議4~8,1.13版本中默認已改為4。

**
2.s.b.r.timer-service.factory(社區版默認ROCKSDB,平台默認HEAP)**

Timer相關狀態存儲的位置,包含用户註冊的Timer和框架內部註冊的Timer(如Window、Trigger)。若存儲在堆中,則Timer狀態做CP時無法異步Snapshot,所以Timer很多的情況下存在RocksDB內更好。但美中不足的是,設置為ROCKSDB會有一個極偶發的序列化bug,導致無法從Savepoint恢復狀態,若不能接受,建議HEAP。

**
3.s.b.r.predefined-options(默認DEFAULT)**

社區提供的預設RocksDB調優參數集,有4種:DEFAULT、SPINNING_DISK_OPTIMIZED、
SPINNING_DISK_OPTIMIZED_HIGH_MEM、FLASH_SSD_OPTIMIZED(名稱都很self-explanatory)。該參數容易忽略,但強烈建議設置,比起默認值均有不錯的性能收益。若單個Slot的狀態量達到GB級別,且託管內存充裕,設為SPINNING_DISK_OPTIMIZED_HIGH_MEM最佳。其他情況設為SPINNING_DISK_OPTIMIZED即可。

除了上述參數之外,原則上建議遵循RocksDB Wiki的忠告("No need to tune it unless you see an obvious performance problem"),不再手動調整RocksDB高級參數(如s.b.r.{block | writebuffer | compaction}.*),除非出現了託管內存機制無法解決的問題。筆者也將部分高級參數列出如下,供參考。

圖8 RocksDB高級參數

注意劃線的項會被託管內存機制覆蓋掉。如果經過慎重思考,必須fine tune RocksDB,則需要將s.b.r.memory.managed設為false,同時用户要承擔可能的OOM風險。

3.4 RocksDB監控 & 調優示例

在大狀態作業正式上線之前,應打開一部分必要的RocksDB監控,觀察是否有性能瓶頸。開啟監控對狀態讀寫性能有一定影響,一般建議如下6項:

  • s.b.r.metrics.{block-cache-capacity | block-cache-usage | cur-size-all-mem-tables | mem-table-flush-pending | num-running-flushes | num-running-compactions} = true

觀察完畢並解決問題後,請務必關閉它們。

圖9 示例作業RocksDB監控

上圖是示例作業的部分RocksDB Metrics圖表,比較正常。如果在穩定消費階段,Flush和Compaction等重量級操作特別頻繁,以至於圖中的點連成線,一般就提示RocksDB遇到了瓶頸。但是託管內存(即BlockCache)佔用100%是正常現象,基本不必擔心。

作為參考,該作業的增量Checkpoint大小在15G左右,每日攝入數十億條狀態數據,設置參數為:t. m. managed. fraction = 0.25(實際分配託管內存3.6G),s. b. r. predefined- options = SPINNING_ DISK_ OPTIMIZED,s. b. r. checkpoint. transfer. thread. num = 8。表現良好。而調優前作業的t. m. managed. fraction是默認的0.1,並且還對RocksDB高級參數做了一些無謂的修改,性能表現不佳。

3.5 狀態TTL

RocksDB的狀態TTL需要藉助CompactionFilter實現,如下圖所示。

圖10 狀態TTL原理

用户調用State Ttl Config# cleanupIn Rocksdb Compact Filter (N)方法,就可以設定在訪問狀態N次後,更新CompactionFilter記錄的時間戳。當SST執行Compaction操作時,會根據該時間戳檢查狀態鍵值對是否過期並刪除掉。注意若訪問狀態非常頻繁,N值應適當調大(默認僅為1000),防止影響Compaction性能。

3.6 狀態縮放與最大並行度

當作業的並行度改變並從CP / SP恢復時,就會涉及狀態縮放的問題。Flink內Keyed State數據以KeyGroup為單位組織,每個key經過兩重Murmur Hash計算出它應該落在哪個KeyGroup中,同時每個Sub-task會分配到一個或多個KeyGroup。如下圖所示,並行度變化只會影響KeyGroup的分配,可以將狀態恢復的過程近似化為順序讀,提高效率。

圖11 Keyed State的縮放

KeyGroup的數量與最大並行度相同,而最大並行度改變會導致作業無法從CP / SP恢復,所以要謹慎設定。如果用户沒有顯式設置,就會根據以下規則來推算:

128 <= round Up To Power Of Two (operator Parallelism * 1.5) <= 32768

顯然這並不安全。假設一個作業的並行度是200,推算的最大並行度是512;若將其並行度提升至400,推算的最大並行度就會變成1024。所以總是推薦顯式設置合理的最大並行度。

3.7 狀態本地恢復

狀態本地恢復默認關閉,可以通過設置
state.backend.local-recovery = true啟用,但它只能作用於Aligned Checkpoint和Keyed State。啟用後,每次CP產生兩份快照:Primary(遠端DFS)和Secondary(本地磁盤),且Secondary CP失敗不會影響整個CP流程。作業恢復時,首先嚐試從有效的Secondary快照恢復狀態,能顯著提高恢復速度。如果Secondary快照不可用或不完整,再fallback到Primary恢復。如下圖所示。

圖12 狀態本地恢復

狀態本地恢復會引入額外的磁盤消耗:非增量CP會導致磁盤佔用量翻倍;增量CP由於原生存在引用計數機制,不會多消耗空間,但因為數據比較分散,IOPS會相應增加。

04 其他調優項

4.1 Checkpoint相關

讀者應該很熟悉Checkpoint相關的配置項了,這裏只提兩點:一是checkpointTimeout根據作業特性設置,但不要過長,防止CP卡死掩蓋作業本身的問題(如數據傾斜);二是一定要設置
minPauseBetweenCheckpoints,避免算子一直處在CP過程中導致性能下降。示例作業的設置是:checkpointInterval = 3min / checkpointTimeout = 15min / minPauseBetweenCheckpoints = 1min。

另外,在大狀態作業中碰到一種常見的現象,即Checkpoint全部ack之後卡在IN_PROGRESS,經過1~3分鐘左右才會變成COMPLETED,如下圖所示。

圖13 Checkpoint卡在IN_PROGRESS狀態的現象

這是因為TaskManager和HDFS之間通信不暢,或者是HDFS本身的壓力導致數據塊寫入失敗。而Flink必須保證Checkpoint的完整性,即重試到所有快照數據都成功寫入才能標記為COMPLETED。讀者可在TM日誌中發現形如Exception in createBlockOutputStream: Connect timed out的異常信息。

4.2 對象重用

對象重用在Flink配置中不是很起眼,但卻相當有用。Flink在生成JobGraph時會將符合一定條件的算子組合成算子鏈(OperatorChain),所有chain在一起的Sub-task都會在同一個TM Slot中執行。而對象重用的本質就是在算子鏈內的下游算子中直接使用上游算子發射對象的淺拷貝。

圖14 算子鏈示意

如圖所示,若不啟用對象重用,算子鏈中的虛線默認是CopyingChainingOutput(深拷貝)。通過ExecutionConfig#enableObjectReuse()或者pipeline.object-reuse = true啟用對象重用,CopyingChainingOutput就會被替換為ChainingOutput(淺拷貝)。下圖示出了兩者之間的差異。

圖15 是否重用對象的區別

DataStream API作業一般不建議開啟對象重用,除非十分確定不存在下游算子直接修改上游算子發射的對象的情況。並且DataStream API作業開啟對象重用的收益不高,僅當其中有複雜數據類型定義時,才會有20%左右的性能提升。

但是SQL作業強烈建議開啟,因為Flink SQL的類型系統與DataStream API有差異,StringData、MapData等的深拷貝成本很大,並且Flink SQL的代碼生成器能夠保證可變對象的安全性。測試結果表明,對象重用的SQL作業平均可獲得翻倍的性能提升。

4.3 別忘了JobManager

相對於TaskManager,JobManager的配置往往比較省心,似乎隨便給個2C / 4G的配置就可以高枕無憂了。實際上JobManager內部維護的組件很多,如:作業DAG即{Job | Execution}Graph、SlotPool & Scheduler、<TaskManagerLocation, TaskExecutorGateway>的映射關係、CheckpointCoordinator、HeartbeatManager、ShuffleMaster、PartitionTracker等。

所以,如果作業Slot / Sub-task多,Checkpoint比較大,或者是重Shuffle的批作業,一定要適當增加JobManager的資源。最近作者部門有兩個作業頻繁出現ResourceManager leader changed to new address null的異常信息,就是因為JM壓力過大、GC時間太長,導致ZooKeeper Session失效了。以示例作業的JM(4C / 8G)為例,其內存分配如下。

圖16 示例作業JobManager內存分配

4.4 其他小Tips

  • 從Flink 1.12開始,默認的時間語義變成了事件時間。如果作業是處理時間語義,可以禁用水印發射,即:Execution Config# set Auto WatermarkInterval (0)
  • 設置metrics.latency.interval(單位毫秒)可以週期性插入LatencyMarker,用於測量各算子及全鏈路的延遲。處理LatencyMarker會佔用資源,因此不需要特別頻繁,60000左右比較合適。
  • 用户註冊的Timer會按照<key, timestamp>去重,並在內部以最小堆存儲。所以要儘量避免onTimer風暴,即大量key的Timer在同一個時間戳觸發,造成性能抖動。
  • 如果需要交換Flink原生沒有Serializer支持的數據類型(如HyperLogLog、RoaringBitmap),應在代碼中註冊自定義的Serializer,避免fallback到Kryo導致性能下降。
  • POJO類型支持狀態Schema變化,增刪字段不會影響恢復(新增的字段會以默認值初始化)。但是切記不能修改字段的數據類型以及POJO的類名。

05 References