Flink 非對齊Unaligned的checkpoint(原始碼分析)
本文原始碼基於flink1.14
在幫助使用者排查任務的時候,經常會發現部分task處理的慢,在Exactly once語義時需要等待快照的對齊而白白柱塞的情況
在flink1.11版本引入了非對齊的checkpoint,來解決這種柱塞問題,所以來看看這個新特性的原始碼是如何實現的
先看下官網的圖來總的說下實現原理,再來看看原始碼
flink是基於Chandy-Lamport演算法來實現全域性快照的,其核心就是在資料中間穿插barrier
當一個task上游同一批次所有的barrier到齊時,就可以觸發快照狀態的儲存了,問題就是出在這裡,等待對齊
來看下上面官網的圖,是官網優化的一個具體思路
當某task的第一個barrier到時,那內部當前批次的狀態必然是不完整的,那多久才算完整呢,等到這批checkpoint的資料全部都到齊都處理完,那狀態就完整了
那當第一個barrier到的時候,剩下沒到的資料在哪呢,答案就是,上游task的outBuffer(ResultSubpartition)和自己inBuffer(inputGates)裡面
ok.分析到這裡就可以看flink的思路了,如果當第一個barrier來的時候我不能觸發checkpoint, 是因為還有部分資料沒有處理到
那乾脆就直接把這部分還沒處理的資料(在buffer裡面的資料),連同狀態資料一起儲存到checkpoint裡面不就行了嗎 ???
在從checkpoint恢復的時候就先把這部分buffer資料, 先恢復到當前task的buffer裡面,繼續計算就可以了,其實弱化了每個checkpoint批次的概念
這樣一來當收到第一個barrier的時候,就可以直接觸發checkpoint了
下面就是來看看Flink 原始碼的實現了
看下熟悉的StreamTask因為barrier在flink裡面依然被當做資料的一種,在主迴圈裡面看看接收到以後做了什麼
先看輸入inputBuffer的儲存
在AbstractStreamTaskNetworkInput中接收資料的時候從inputGate拉取資料的時候
可以看到會根據資料的型別,如果是barrier型別會走到processBarrier方法
注意這裡的這個barrierHandler是SingleCheckpointBarrierHandler實現類,因為非對齊模式的話收到第一個barrier就觸發checkpoint了,所以也等同於sigle了
這裡的state是實現類AlternatingWaitingForFirstBarrierUnaligned是非對齊模式特有的
來看看怎麼處理的
可以看到在 SubtaskCheckpointCoordinatorImpl 中會準備inputFlight資料的快照,目的肯定就是為了保持到checkpoint中
這個方法prepareInflightDataSnapshot方法看下
會呼叫 BiFunctionWithException prepareInputSnapshot這個action物件,這個物件從哪裡傳進來的呢?
原來在StreamTask建構函式的時候就通過自己的prepareInputSnapshot方法來建立這個Function了
來看下這個方法的邏輯
會遍歷所有的inputProcess然後呼叫它的準備快照方法
這個方法裡面具體
就將具體的input的資料儲存到state裡面去了
input的儲存就說完了
接著來看output快取資料的儲存
會到最開始的AlternatingWaitingForFirstBarrierUnaligned類當儲存完input buffer的資料以後initInputsCheckpoint方法之後
來看下後面的邏輯
當觸發完input資料的儲存以後,就是觸發全域性的checkpoint了,這裡會一直走到streamTask的 triggerCheckpointOnBarrier在裡面會走到 performCheckpoint
最後在SubtaskCheckpointCoordinatorImpl類中
最後在 BufferWritingResultPartition 類裡面
PipelinedSubpartition呼叫addbuffer然後將channelStateWriter.addOutputData把output buffer的資料儲存到狀態裡面去了
講完觸發checkpoint儲存快取中的資料,接下來就是從chekpoint恢復的時候怎麼恢復這些未處理的資料了
來看下StreamTask如果從chekpoint恢復的是否是如何處理的
restore方法呼叫了restoreGates
這裡就是將資料in buffer狀態的儲存到InputGate, 然後out buffer 的狀態資料儲存到ResultPartitionWriter裡面去,繼續處理了
over
- 詳解CVE-2022-0847 DirtyPipe漏洞
- 用「閃電俠」的例子解釋一下程序和執行緒
- Fluent-Validator 業務校驗器
- Java 統計新客戶
- 【Java面試】Redis存線上程安全問題嗎?為什麼?
- 其實 Gradle Transform 就是個紙老虎 —— Gradle 系列(4)
- drools的型別宣告(Type declarations)
- JavaScript中if語句優化和部分語法糖小技巧推薦
- Spring Boot 微信小程式_儲存微信登入者的個人資訊
- Arthas常用功能及一次線上問題排查
- 用 Go 快速開發一個 RESTful API 服務
- GitHub 畢業年鑑「GitHub 熱點速覽 v.22.20」
- docker 1.2 之docker基本用法
- Linux-Mycat實現MySQL的讀寫分離
- 工具14Finger-全能web指紋識別與分享平臺
- CMake技術總結
- vue - Vue路由(擴充套件)
- C# WPF後臺動態新增控制元件(經典)
- WPF中的依賴屬性
- 關於Spring中的useSuffixPatternMatch