面試指北 | 圖解 Flink 的 Checkpoint 機制

語言: CN / TW / HK

Flink是一個分散式的流處理引擎,而流處理的其中一個特點就是7X24。那麼,如何保障Flink作業的持續執行呢?Flink的內部會將應用狀態(state)儲存到本地記憶體或者嵌入式的kv資料庫(RocksDB)中,由於採用的是分散式架構,Flink需要對本地生成的狀態進行持久化儲存,以避免因應用或者節點機器故障等原因導致資料的丟失,Flink是通過checkpoint(檢查點)的方式將狀態寫入到遠端的持久化儲存,從而就可以實現不同語義的結果保障。通過本文,你可以瞭解到什麼是全域性一致性檢查點,Flink內部如何通過檢查點實現Exactly Once的結果保障。

什麼是Checkpoint(檢查點)

為了保證state容錯,Flink提供了處理故障的措施,這種措施稱之為checkpoint(一致性檢查點)。checkpoint是Flink實現容錯的核心功能,主要是週期性地觸發checkpoint,將state生成快照持久化到外部儲存系統(比如HDFS)。這樣一來,如果Flink程式出現故障,那麼就可以從上一次checkpoint中進行狀態恢復,從而提供容錯保障。另外,通過checkpoint機制,Flink可以實現Exactly-once語義(Flink內部的Exactly-once,關於端到端的exactly_once,Flink是通過兩階段提交協議實現的)。下面將會詳細分析Flink的checkpoint機制。

檢查點的生成

如上圖,輸入流是使用者行為資料,包括購買(buy)和加入購物車(cart)兩種,每種行為資料都有一個偏移量,統計每種行為的個數。

第一步:JobManager checkpoint coordinator 觸發checkpoint。

第二步:假設當消費到[cart,3]這條資料時,觸發了checkpoint。那麼此時資料來源會把消費的偏移量3寫入持久化儲存。

第三步:當寫入結束後,source會將state handle(狀態儲存路徑)反饋給JobManager的checkpoint coordinator。

第四步:接著運算元count buy與count cart也會進行同樣的步驟

第五步:等所有的運算元都完成了上述步驟之後,即當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全域性完成了,向持久化儲存中再備份一個 Checkpoint meta 檔案,那麼整個checkpoint也就完成了,如果中間有一個不成功,那麼本次checkpoin就宣告失敗。

檢查點的恢復

通過上面的分析,或許你已經對Flink的checkpoint有了初步的認識。那麼接下來,我們看一下是如何從檢查點恢復的。

  • 任務失敗

  • 重啟作業

  • 恢復檢查點

  • 繼續處理資料

上述過程具體總結如下:

  • 第一步:重啟作業

  • 第二步:從上一次檢查點恢復狀態資料

  • 第三步:繼續處理新的資料

Flink內部Exactly-Once實現

Flink提供了精確一次的處理語義,精確一次的處理語義可以理解為:資料可能會重複計算,但是結果狀態只有一個。Flink通過Checkpoint機制實現了精確一次的處理語義,Flink在觸發Checkpoint時會向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的,並且會向下遊運算元進行傳遞。checkpoint barriers攜帶一個checkpoint ID,用於標識屬於哪一個checkpoint,checkpoint barriers將流邏輯是哪個分為了兩部分。對於雙流的情況,通過barrier對齊的方式實現精確一次的處理語義。

關於什麼是checkpoint barrier,可以看一下CheckpointBarrier類的原始碼描述,如下:

/**
 * Checkpoint barriers用來在資料流中實現checkpoint對齊的.
 * Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,
 * Source會把barrier廣播發送到下游運算元,當一個運算元接收到了其中一個輸入流的Checkpoint barrier時,
 * 它就會知道已經處理完了本次checkpoint與上次checkpoint之間的資料.
 *
 * 一旦某個運算元接收到了所有輸入流的checkpoint barrier時,
 * 意味著該運算元的已經處理完了截止到當前checkpoint的資料,
 * 可以觸發checkpoint,並將barrier向下遊傳遞
 *
 * 根據使用者選擇的處理語義,在checkpoint完成之前會快取後一次checkpoint的資料,
 * 直到本次checkpoint完成(exactly once)
 *
 * checkpoint barrier的id是嚴格單調遞增的
 *
 */
    public class CheckpointBarrier extends RuntimeEvent {...}

可以看出checkpoint barrier主要功能是實現checkpoint對齊的,從而可以實現Exactly-Once處理語義。

下面將會對checkpoint過程進行分解,具體如下:

圖1,包括兩個流,每個任務都會消費一條使用者行為資料(包括購買(buy)和加購(cart)),數字代表該資料的偏移量,count buy任務統計購買行為的個數,coun cart統計加購行為的個數。

圖2,觸發checkpoint,JobManager會向每個資料來源傳送一個新的checkpoint編號,以此來啟動檢查點生成流程。

  • 圖3,當Source任務收到訊息後,會停止發出資料,然後利用狀態後端觸發生成本地狀態檢查點,並把該checkpoint barrier以及checkpoint id廣播至所有傳出的資料流分割槽。狀態後端會在checkpoint完成之後通知任務,隨後任務會向Job Manager傳送確認訊息。在將checkpoint barrier發出之後,Source任務恢復正常工作。

  • 圖4,Source任務發出的checkpoint barrier會發送到與之相連的下游運算元任務,當任務收到一個新的checkpoint barrier時,會繼續等待其他輸入分割槽的checkpoint barrier到來,這個過程稱之為 barrier 對齊 ,checkpoint barrier到來之前會把到來的資料線快取起來。

  • 圖5,任務收齊了全部輸入分割槽的checkpoint barrier之後,會通知狀態後端開始生成checkpoint,同時會把checkpoint barrier廣播至下游運算元。

  • 圖6,任務在發出checkpoint barrier之後,開始處理因barrier對齊產生的快取資料,在快取的資料處理完之後,就會繼續處理輸入流資料。

  • 圖7,最終checkpoint barrier會被傳送到sink端,sink任務接收到checkpoint barrier之後,會向其他運算元任務一樣,將自身的狀態寫入checkpoint,之後向Job Manager傳送確認訊息。Job Manager接收到所有任務返回的確認訊息之後,就會將此次檢查點標記為完成。

使用案例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


// checkpoint的時間間隔,如果狀態比較大,可以適當調大該值
env.enableCheckpointing(1000);
// 配置處理語義,預設是exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩個checkpoint之間的最小時間間隔,防止因checkpoint時間過長,導致checkpoint積壓
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint執行的上限時間,如果超過該閾值,則會中斷checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大並行執行的檢查點數量,預設為1,可以指定多個,從而同時出發多個checkpoint,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 設定週期性外部檢查點,將狀態資料持久化到外部系統中,
// 使用該方式不會在任務正常停止的過程中清理掉檢查點資料
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

總結

本文首先從Flink的狀態入手,以圖解加文字的形式詳細解釋了Flink的checkpoint機制,並給出了使用Checkpoint時的程式配置。