知根知底:Flink-KafkaConsumer 詳解

語言: CN / TW / HK

Flink-Kafka  Connector 是連線kafka 的聯結器,負責對接kafka 的讀寫, 本篇主要介紹kafka consumer 的執行流程與核心設計。

邏輯執行流程

  1. 分配當前task消費的partition與起始的offset : 根據從狀態中恢復的資料與客戶端指定的消費模式, 採取的方式是狀態中offset優先, 即從狀態中能夠找到對應的offset 就使用該offset , 否則就根據客戶端指定的方式

  2. 從kafka 中不斷拉取資料, 傳送到下游,並且儲存當前的offset

  3. 為了保證整個任務的全域性一致性,需要將offset 提交到狀態中

  4. 如果開啟了分割槽發現模式,那麼需要將檢測到新的分割槽新增到消費執行緒中。

兩個重要介面

Flink 保證全域性資料一致性是通過全域性狀態快照checkpoint 完成的, 也就是週期性的執行checkpoint 將當前的任務狀態儲存起來, Flink 在整個checkpoint 的執行過程中提供了兩個介面,方便使用者去做一些自定義的操作, 例如操作狀態、兩階段提交實現等等。

CheckpointedFunction介面

提供了initializeState方法與snapshotState方法,initializeState方法是在任務初始化時候執行,常見的就是獲取的checkpoint 中的狀態資料;snapshotState方法是在每次checkpoint觸發都會執行,常見的就是將資料存放在狀態物件中,以便能夠被持久化。

CheckpointListener介面

提供了notifyCheckpointComplete方法與notifyCheckpointAborted方法,這兩個方法都是在一次checkpoint 完成之後執行,那麼有可能是通知成功回撥(notifyCheckpointComplete)也有可能失敗回撥(notifyCheckpointAborted)。

具體實現

對於Flink 來說source端的標準對接介面是SourceFunction ,主要實現其run方法,在run 中執行資料的pull操作;另外為了保證整個狀態的一致性,在checkpoint時需要記錄checkpoint時的offset, 並且保證其失敗重啟時也能夠從checkpoint 記錄的offset開始消費, 因此同時實現了CheckpointedFunction介面與CheckpointListener介面,這兩個介面提供了可操作狀態的一些方法。

FlinkKafkaConsumerBase 實現 SourceFunction、CheckpointedFunction、 CheckpointListener 介面的抽象類,包含了整個流程的核心方法,如下:

initializeState

從checkpoint 中 恢復最近一次或者是指定批次checkpoint 中offset,  並將其存放在TreeMap<KafkaTopicPartition,Long> 結的 restoredState 物件中

open

主要作用就是分配當前task消費的partitioin 的offset 位置

1.  partition 分配策略:姑且認為是當前task的下標與 partition%numTask 相等就分配給當前task

2.  offset 分配策略:有狀態資料就使用狀態資料的offset ; 沒有就根據客戶端指定的StartupMode作為消費起點

run

開始消費kafka 中資料, 通過 KafkaFetcher 完成 : 

1.  啟動了一個消費執行緒 KafkaConsumerThread 從kakfa 中拉取資料,將其儲存到 Handover 的next 物件中

2.  迴圈從Handover 的next 中獲取資料

3.  記錄下當前的offset, 更新到subscribedPartitionStates 中去 

createAndStartDiscoveryLoop

在run 方法中被呼叫, 開啟了非同步分割槽發現的執行緒discoveryLoopThread,會按照指定的時間間隔檢查是否有新的分割槽(預設情況下不開啟),  當發現有新的分割槽時會將其新增到unassignedPartitionsQueue中, 以便被KafkaConsumerThread 執行緒檢測到

snapshotState

將記錄的subscribedPartitionStates 中消費進度資料寫入到 unionOffsetStates 狀態中與臨時物件pendingOffsetsToCommit中

notifyCheckpointComplete

提交offset 至kafka中:將pendingOffsetsToCommit 中記錄當前批次checkpoint 的offset 資料提交到kafka 中

核心流程

  1. KakfaConsumerThread 執行緒不斷從Kafka 中消費資料

  2. 消費的資料儲存handover 中

  3. kafkaFetch 不斷從handover 獲取資料進行處理

其他流程  

  1. initializeState、snapshotState 這兩個方法是實現了CheckpointedFunction接口裡面的對應方法, CheckpointedFunction 介面是Flink 提供的兩個hook, 任務初始化執行initializeState,用於從狀態中恢復資料, 優於open先執行, 用於其恢復offset資料;snapshotState 每次觸發checkpoint 時執行,提供使用者操作hook, 用於將offset 資料儲存在狀態中。

  2. notifyCheckpointComplete 是實現了CheckpointListener 介面中的方法, checkpoint 完成之後的回撥方法, 提交狀態中的offset資料至kafka中

offset 提交

對於整個offset的提交至kafka中, 類似於兩階段的提交過程:

  • 第一階段:執行checkpoint 時即呼叫snapshotState方法,  offset 儲存到狀態中

  • 第二階段:checkpoint 執行完成時回撥notifyCheckpointComplete方法,offset 提交到kafka中

對於第一階段失敗任務直接重啟,從最近一次checkpoint記錄的位點開始消費,對於第二階段提交offset至kafka如果失敗,並不會導致任務重啟,只是做了日誌記錄,因為提交offset到kafka成功與否並不會影響任務的執行。

啟動時offset指定

  • 如果是從checkpoint 恢復,那麼就會忽略客戶端所指定的startMode , 也就是checkpoint 狀態資料優先

總結

本篇主要介紹了FlinkKafkaConsumer的核心設計流程與實現,同時介紹了與checkpoint流程結合完成offset的管理。