知根知底:Flink-KafkaConsumer 詳解
Flink-Kafka Connector 是連線kafka 的聯結器,負責對接kafka 的讀寫, 本篇主要介紹kafka consumer 的執行流程與核心設計。
邏輯執行流程
-
分配當前task消費的partition與起始的offset : 根據從狀態中恢復的資料與客戶端指定的消費模式, 採取的方式是狀態中offset優先, 即從狀態中能夠找到對應的offset 就使用該offset , 否則就根據客戶端指定的方式
-
從kafka 中不斷拉取資料, 傳送到下游,並且儲存當前的offset
-
為了保證整個任務的全域性一致性,需要將offset 提交到狀態中
-
如果開啟了分割槽發現模式,那麼需要將檢測到新的分割槽新增到消費執行緒中。
兩個重要介面
Flink 保證全域性資料一致性是通過全域性狀態快照checkpoint 完成的, 也就是週期性的執行checkpoint 將當前的任務狀態儲存起來, Flink 在整個checkpoint 的執行過程中提供了兩個介面,方便使用者去做一些自定義的操作, 例如操作狀態、兩階段提交實現等等。
CheckpointedFunction介面
提供了initializeState方法與snapshotState方法,initializeState方法是在任務初始化時候執行,常見的就是獲取的checkpoint 中的狀態資料;snapshotState方法是在每次checkpoint觸發都會執行,常見的就是將資料存放在狀態物件中,以便能夠被持久化。
CheckpointListener介面
提供了notifyCheckpointComplete方法與notifyCheckpointAborted方法,這兩個方法都是在一次checkpoint 完成之後執行,那麼有可能是通知成功回撥(notifyCheckpointComplete)也有可能失敗回撥(notifyCheckpointAborted)。
具體實現
對於Flink 來說source端的標準對接介面是SourceFunction ,主要實現其run方法,在run 中執行資料的pull操作;另外為了保證整個狀態的一致性,在checkpoint時需要記錄checkpoint時的offset, 並且保證其失敗重啟時也能夠從checkpoint 記錄的offset開始消費, 因此同時實現了CheckpointedFunction介面與CheckpointListener介面,這兩個介面提供了可操作狀態的一些方法。
FlinkKafkaConsumerBase 實現 SourceFunction、CheckpointedFunction、 CheckpointListener 介面的抽象類,包含了整個流程的核心方法,如下:
initializeState
從checkpoint 中 恢復最近一次或者是指定批次checkpoint 中offset, 並將其存放在TreeMap<KafkaTopicPartition,Long> 結的 restoredState 物件中
open
主要作用就是分配當前task消費的partitioin 的offset 位置
1. partition 分配策略:姑且認為是當前task的下標與 partition%numTask 相等就分配給當前task
2. offset 分配策略:有狀態資料就使用狀態資料的offset ; 沒有就根據客戶端指定的StartupMode作為消費起點
run
開始消費kafka 中資料, 通過 KafkaFetcher 完成 :
1. 啟動了一個消費執行緒 KafkaConsumerThread 從kakfa 中拉取資料,將其儲存到 Handover 的next 物件中
2. 迴圈從Handover 的next 中獲取資料
3. 記錄下當前的offset, 更新到subscribedPartitionStates 中去
createAndStartDiscoveryLoop
在run 方法中被呼叫, 開啟了非同步分割槽發現的執行緒discoveryLoopThread,會按照指定的時間間隔檢查是否有新的分割槽(預設情況下不開啟), 當發現有新的分割槽時會將其新增到unassignedPartitionsQueue中, 以便被KafkaConsumerThread 執行緒檢測到
snapshotState
將記錄的subscribedPartitionStates 中消費進度資料寫入到 unionOffsetStates 狀態中與臨時物件pendingOffsetsToCommit中
notifyCheckpointComplete
提交offset 至kafka中:將pendingOffsetsToCommit 中記錄當前批次checkpoint 的offset 資料提交到kafka 中
核心流程
-
KakfaConsumerThread 執行緒不斷從Kafka 中消費資料
-
消費的資料儲存handover 中
-
kafkaFetch 不斷從handover 獲取資料進行處理
其他流程
-
initializeState、snapshotState 這兩個方法是實現了CheckpointedFunction接口裡面的對應方法, CheckpointedFunction 介面是Flink 提供的兩個hook, 任務初始化執行initializeState,用於從狀態中恢復資料, 優於open先執行, 用於其恢復offset資料;snapshotState 每次觸發checkpoint 時執行,提供使用者操作hook, 用於將offset 資料儲存在狀態中。
-
notifyCheckpointComplete 是實現了CheckpointListener 介面中的方法, checkpoint 完成之後的回撥方法, 提交狀態中的offset資料至kafka中 。
offset 提交
對於整個offset的提交至kafka中, 類似於兩階段的提交過程:
-
第一階段:執行checkpoint 時即呼叫snapshotState方法, offset 儲存到狀態中
-
第二階段:checkpoint 執行完成時回撥notifyCheckpointComplete方法,offset 提交到kafka中
對於第一階段失敗任務直接重啟,從最近一次checkpoint記錄的位點開始消費,對於第二階段提交offset至kafka如果失敗,並不會導致任務重啟,只是做了日誌記錄,因為提交offset到kafka成功與否並不會影響任務的執行。
啟動時offset指定
-
如果是從checkpoint 恢復,那麼就會忽略客戶端所指定的startMode , 也就是checkpoint 狀態資料優先
總結
本篇主要介紹了FlinkKafkaConsumer的核心設計流程與實現,同時介紹了與checkpoint流程結合完成offset的管理。