面試指北 | 圖解 Flink 的 Checkpoint 機制
Flink是一個分散式的流處理引擎,而流處理的其中一個特點就是7X24。那麼,如何保障Flink作業的持續執行呢?Flink的內部會將應用狀態(state)儲存到本地記憶體或者嵌入式的kv資料庫(RocksDB)中,由於採用的是分散式架構,Flink需要對本地生成的狀態進行持久化儲存,以避免因應用或者節點機器故障等原因導致資料的丟失,Flink是通過checkpoint(檢查點)的方式將狀態寫入到遠端的持久化儲存,從而就可以實現不同語義的結果保障。通過本文,你可以瞭解到什麼是全域性一致性檢查點,Flink內部如何通過檢查點實現Exactly Once的結果保障。
什麼是Checkpoint(檢查點)
為了保證state容錯,Flink提供了處理故障的措施,這種措施稱之為checkpoint(一致性檢查點)。checkpoint是Flink實現容錯的核心功能,主要是週期性地觸發checkpoint,將state生成快照持久化到外部儲存系統(比如HDFS)。這樣一來,如果Flink程式出現故障,那麼就可以從上一次checkpoint中進行狀態恢復,從而提供容錯保障。另外,通過checkpoint機制,Flink可以實現Exactly-once語義(Flink內部的Exactly-once,關於端到端的exactly_once,Flink是通過兩階段提交協議實現的)。下面將會詳細分析Flink的checkpoint機制。
檢查點的生成
如上圖,輸入流是使用者行為資料,包括購買(buy)和加入購物車(cart)兩種,每種行為資料都有一個偏移量,統計每種行為的個數。
第一步:JobManager checkpoint coordinator 觸發checkpoint。
第二步:假設當消費到[cart,3]這條資料時,觸發了checkpoint。那麼此時資料來源會把消費的偏移量3寫入持久化儲存。
第三步:當寫入結束後,source會將state handle(狀態儲存路徑)反饋給JobManager的checkpoint coordinator。
第四步:接著運算元count buy與count cart也會進行同樣的步驟
第五步:等所有的運算元都完成了上述步驟之後,即當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全域性完成了,向持久化儲存中再備份一個 Checkpoint meta 檔案,那麼整個checkpoint也就完成了,如果中間有一個不成功,那麼本次checkpoin就宣告失敗。
檢查點的恢復
通過上面的分析,或許你已經對Flink的checkpoint有了初步的認識。那麼接下來,我們看一下是如何從檢查點恢復的。
-
任務失敗
-
重啟作業
-
恢復檢查點
-
繼續處理資料
上述過程具體總結如下:
-
第一步:重啟作業
-
第二步:從上一次檢查點恢復狀態資料
-
第三步:繼續處理新的資料
Flink內部Exactly-Once實現
Flink提供了精確一次的處理語義,精確一次的處理語義可以理解為:資料可能會重複計算,但是結果狀態只有一個。Flink通過Checkpoint機制實現了精確一次的處理語義,Flink在觸發Checkpoint時會向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的,並且會向下遊運算元進行傳遞。checkpoint barriers攜帶一個checkpoint ID,用於標識屬於哪一個checkpoint,checkpoint barriers將流邏輯是哪個分為了兩部分。對於雙流的情況,通過barrier對齊的方式實現精確一次的處理語義。
關於什麼是checkpoint barrier,可以看一下CheckpointBarrier類的原始碼描述,如下:
/** * Checkpoint barriers用來在資料流中實現checkpoint對齊的. * Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中, * Source會把barrier廣播發送到下游運算元,當一個運算元接收到了其中一個輸入流的Checkpoint barrier時, * 它就會知道已經處理完了本次checkpoint與上次checkpoint之間的資料. * * 一旦某個運算元接收到了所有輸入流的checkpoint barrier時, * 意味著該運算元的已經處理完了截止到當前checkpoint的資料, * 可以觸發checkpoint,並將barrier向下遊傳遞 * * 根據使用者選擇的處理語義,在checkpoint完成之前會快取後一次checkpoint的資料, * 直到本次checkpoint完成(exactly once) * * checkpoint barrier的id是嚴格單調遞增的 * */ public class CheckpointBarrier extends RuntimeEvent {...}
可以看出checkpoint barrier主要功能是實現checkpoint對齊的,從而可以實現Exactly-Once處理語義。
下面將會對checkpoint過程進行分解,具體如下:
圖1,包括兩個流,每個任務都會消費一條使用者行為資料(包括購買(buy)和加購(cart)),數字代表該資料的偏移量,count buy任務統計購買行為的個數,coun cart統計加購行為的個數。
圖2,觸發checkpoint,JobManager會向每個資料來源傳送一個新的checkpoint編號,以此來啟動檢查點生成流程。
-
圖3,當Source任務收到訊息後,會停止發出資料,然後利用狀態後端觸發生成本地狀態檢查點,並把該checkpoint barrier以及checkpoint id廣播至所有傳出的資料流分割槽。狀態後端會在checkpoint完成之後通知任務,隨後任務會向Job Manager傳送確認訊息。在將checkpoint barrier發出之後,Source任務恢復正常工作。
-
圖4,Source任務發出的checkpoint barrier會發送到與之相連的下游運算元任務,當任務收到一個新的checkpoint barrier時,會繼續等待其他輸入分割槽的checkpoint barrier到來,這個過程稱之為 barrier 對齊 ,checkpoint barrier到來之前會把到來的資料線快取起來。
-
圖5,任務收齊了全部輸入分割槽的checkpoint barrier之後,會通知狀態後端開始生成checkpoint,同時會把checkpoint barrier廣播至下游運算元。
-
圖6,任務在發出checkpoint barrier之後,開始處理因barrier對齊產生的快取資料,在快取的資料處理完之後,就會繼續處理輸入流資料。
-
圖7,最終checkpoint barrier會被傳送到sink端,sink任務接收到checkpoint barrier之後,會向其他運算元任務一樣,將自身的狀態寫入checkpoint,之後向Job Manager傳送確認訊息。Job Manager接收到所有任務返回的確認訊息之後,就會將此次檢查點標記為完成。
使用案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpoint的時間間隔,如果狀態比較大,可以適當調大該值
env.enableCheckpointing(1000);
// 配置處理語義,預設是exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩個checkpoint之間的最小時間間隔,防止因checkpoint時間過長,導致checkpoint積壓
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint執行的上限時間,如果超過該閾值,則會中斷checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大並行執行的檢查點數量,預設為1,可以指定多個,從而同時出發多個checkpoint,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 設定週期性外部檢查點,將狀態資料持久化到外部系統中,
// 使用該方式不會在任務正常停止的過程中清理掉檢查點資料
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
總結
本文首先從Flink的狀態入手,以圖解加文字的形式詳細解釋了Flink的checkpoint機制,並給出了使用Checkpoint時的程式配置。
- flink任務提交與執行2-ExecutionEnvironment初始化
- Flink 並行流中 watermark 機制無法觸發視窗計算的原因分析
- 兩萬字Flink筆記
- Flink SQL 知其所以然(二十三):SQL 的時區問題!
- Flink SQL 知其所以然(二十二):SQL 的時間語義!(建議收藏)
- 大資料Hadoop之——Flink的狀態管理和容錯機制(checkpoint)
- Flink SQL 知其所以然(二十一):SQL 資料型別大全!
- 開源專案丨Taier 1.1版本正式釋出,新增功能一覽為快
- 基於 FFI 的 PyFlink 下一代 Python 執行時介紹
- 說說FLINK細粒度滑動視窗如何處理
- Native Flink on Kubernetes 在小紅書的實踐
- Flink SQL 知其所以然:核心思想之動態表 & 連續查詢!
- 官宣|Apache Flink 1.15 釋出公告
- Flink Metrics&REST API 介紹和原理解析
- 大資料Hadoop之——實時計算流計算引擎Flink(Flink環境部署)
- B站基於AIFlow Flink在批流融合排程上的實踐
- 基於 Flink 構建實時數倉場景化實踐
- 重磅!流式資料庫新星 RisingWave 是下一個 Apache Flink 嗎?
- 基於 Flink Hudi 的實時數倉在 Shopee 的實踐
- 知根知底:Flink-KafkaConsumer 詳解