2萬字50張圖玩轉Flink面試體系
大家好,我是老兵。
本系列為大資料技術棧面試體系
系列,每期將分享一個技術元件的知識全體系,並結合面試的形式由淺入深講解。
本期將介紹大資料實時計算利器Flink面試體系,全文內容已製作成PDF。
一 基礎篇
1 簡單介紹下Flink及使用場景
Apache Flink是開源的大資料實時計算框架,具有分散式、高效能、記憶體計算等特點。Flink因其獨特的流批一體
設計模式,被廣泛應用於實時
和離線
資料應用場景。
Flink被稱為第四代大資料計算引擎,在其前面存在Mapreduce、Storm、Spark等計算框架。在流處理領域中,Flink是目前最全面、最強大的實時計算引擎。
結合官網的示意圖,我們來看下Flink的工作場景。
- 資料來源:支援多種資料來源接入。包含事務型資料庫、日誌、IOT裝置、點選事件等資料。
- 處理層:基於Yarn|K8s排程引擎和HDFS|S3儲存元件,提供完整的
事件驅動
、時間語義
、流&批一體
的Flink計算服務。 - 應用層:輸出端提供應用系統、事件日誌、儲存系統等資料對接。
2 Flink程式設計模型瞭解嗎
1)Flink分層模型
Flink底層通過封裝和抽象,提供四級分層程式設計模型,以此支撐業務開發實時和批處理程式。
結合示意圖,我們由下而上進行介紹。
Runtime層:
Flink程式的最底層入口。提供基礎的核心介面完成流、狀態、事件、時間等複雜操作,功能靈活但使用成本較高,一般面向原始碼研發人員。DataStream/Dataset API層:
這一層主要面向開發者。基於Runtime層抽象為兩類API,其中DataStream API處理實時流程式;Dataset API處理批資料程式。Table API:
統一DataStream/DataSet API,抽象成帶有Schema資訊的表結構API。通過Table操作和登錄檔完成資料計算,支援與DataStream/Dataset相互轉換。SQL:
面向資料分析和開發人員,抽象為SQL操作,降低開發門檻和平臺化。
2)Flink計算模型
Flink的計算模型和Spark的模型有些類似。包含輸入端(source)、轉換(Transform)、輸出端(sink)。
source端
:Flink程式的輸入端,支援多個數據源對接Transformation
:Flink程式的轉換過程,實現DataStream/Dataset的計算和轉換sink端
: Flink的輸出端,支援內部和外部輸出源
具體的Flink計算模型(運算元)詳情,可以參考我的文章:一網打盡Flink運算元大全
3 聊聊Flink的工作原理
主要考察對Flink的內部執行機制的瞭解程度,需要重點注意Flink中的重要角色元件及其協作機制。
Flink底層執行分為客戶端
(Client)、Job管理器
(JobManager)、任務執行器
(TaskManager)三種角色元件。其中Client負責Job提交;JobManager負責協調Job執行和Task任務分配;TaskManager負責Task任務執行。
Flink常見執行流程如下(排程器不同會有所區別):
- 1)使用者提交流程式Application。
- 2)Flink
解析StreamGraph
。Optimizer
和Builder模組解析程式程式碼,生成初始StreamGraph
並提交至Client
。 - 3)Client
生成JobGraph
。上述StreamGraph由一系列operator chain構成,在client中會被轉換為JobGraph
,即優化多個chain為一個節點,最終提交到JobManager
。 - 4)JobManager
排程Job
。JobManager和Client的ActorSystem
保持通訊,並生成ExecutionGraph
(並行化JobGraph)。隨後Schduler和Coordinator模組協調並排程Jobz執行。 - 5)TaskManager
部署Task
。TaskManager
和JobManager
的ActorSystem
保持通訊,接受job排程計劃
並在內部劃分TaskSlot
部署執行Task任務
。 - 6)Task
執行
。Task執行期間,JobManager、TaskManager和Client之間保持通訊,回傳任務狀態
和心跳資訊
,監控任務執行。
4 公司怎麼提交Flink實時任務的?談談流程
顧名思義,這裡涉及Flink的部署模式內容。一般Flink部署模式除了Standalone之外,最常見的為Flink on Yarn和Flink on K8s模式,其中Flink on Yarn模式在企業中應用最廣。
Flink on Yarn模式細分由可以分為Flink session、Flink per-job和Flink application模式,下面我們逐一說明。
1)Flink session模式
Flink Session模式會首先啟動一個叢集
,按照配置約定,叢集中包含一定數量的JobManager
和TaskManager
。後面所有提交的Flink Job均共享該叢集內的JobManager和TaskManager,即所有的Flink Job競爭相同資源。
這樣做的好處是節省作業提交資源開銷(叢集已存在),減少資源和執行緒切換工作。但是所有作業共享一個JobManager,導致JobManager
壓力激增,同時一旦某Job發生故障時會影響到其他作業(中斷或重啟)。一般僅適用於短週期
、小容量
作業。
看下Flink-session模式的作業提交流程:
- (1)整體流程分為兩部分:yarn-session叢集啟動、Job提交。
- (2)
yarn-session叢集啟動
。請求YarnRM
啟動JobManager
,隨後JobManager內部啟動Dispatcher
和Flink-yarnRM
程序,等待後續Job提交
。 - (3)Client提交Job。Client連線
Dispatcher
開始提交Job,包含jars
和解析過的JobGraph
拓撲資料結構。 - (4)Dispatcher啟動
JobMaster
,JobMaster向Yarn RM請求slots資源
。 - (5)
Flink-Yarn RM
向Yarn RM請求Container
資源,準備啟動TaskManager。 - (6)Yarn啟動
TaskManager程序
。TaskManager同時向Flink RM反向註冊(自身可用的slots槽數) - (7)TaskManager為新的作業提供slots,與JobMaster通訊。
- (8)JobMaster將執行的
任務分發
給TaskManager,開始部署執行任務
2)Flink Per-job模式
Flink Per-job模式為每個提交的作業啟動叢集,各叢集間相互獨立,並在各自作業完成後銷燬,最大限度保障資源隔離。每個Job均衡分發
自身的JobManager,單獨進行job的排程和執行。
雖然該模式提供了資源隔離,但是每個job均維護一個叢集,啟動、銷燬以及資源請求消耗時間長,因此比較適用於長時間的任務執行(批處理任務)。
Per-job模式在Flink 1.15中棄用,目前推薦使用applicaiton模式。
看下Flink Per-job模式的作業提交流程:
- (1)首先Client提交作業到
YarnRM
,包括jars和JobGraph
等資訊。 - (2)YarnRM分配Container啟動
AppMaster
。AppMaster中啟動JobManager
和FlinkRM
,並將作業提交給JobMaster
。 - (3)JobMaster向YarnRM請求資源(
slots
)。 - (4)FlinkRM向
YarnRM
請求container
並啟動TaskManager
。 - (6)TaskManager啟動之後,向
FlinkRM
註冊自己的可用任務槽。 - (7)TaskManager向FlinkRM反向
註冊
(自身可用的slots槽數
) - (8)TaskManager為新的作業提供
slots
,與JobMaster通訊。 - (9)JobMaster將執行的
任務分發
給TaskManager,開始部署執行任務
3)Flink application模式
Flink application模式綜合Per-job和session的優點,為每個·Application·建立獨立的叢集(JobManager),允許每個Application中包含多個job作業提交(可開啟非同步提交),當application應用完成時叢集關閉。
該模式和前面兩種模式的最大區別是Main()方法此時在JobManager
中執行,即在JobManager中完成檔案下載
、jobGraph解析
、提交資源
等事項。前面兩種模式的main()方法在Client端執行,該模式將大大減少Client壓力。
看下Flink application模式的作業提交流程:
- (1)流程與Per-job模式的提交流程非常相似。
- (2)提交Application。此時首先是提交整個Application應用,應用中包含多個Job。
- (3)每個Job啟動各自的JobManager,可選擇非同步啟動執行。
- (4)其餘步驟與Per-job模式類似,可參考上述步驟詳解。
5 K8s瞭解嗎?談談Flink on K8S的提交流程
由於目前雲原生和K8s容器化的快速發展,很多Flink程式開始轉向容器化部署。首先需要了解下K8s的相關知識,這是個加分項。
1)K8s容器編排技術
k8s全稱kubernete,是一種強大的
、可移植
的高效能容器編排工具
。這裡的容器指的是Docker容器化
技術,它通過將執行環境和配置打包成映象服務
,在任意環境下快速部署docker容器
,提供基礎的環境服務。解決之前部署服務速度慢、遷移難和高成本等問題。
由於Docker容器技術的普及,基於容器構建的雲原生架構越來越多,同時也帶來了很多容器運維管理問題。K8s提供了一套完整的容器編排解決方案,實現容器發現及排程
、負載均衡
、彈性擴容
和資料卷掛載
等服務。
2)Flink on K8s部署模式
整體過程和Flink on Yarn的提交模式比較類似,主要是環境切換成K8s,此時的TaskManager和JobManager等元件變成了K8s Pod角色(映象)。
首先提前定義各元件的服務配置檔案並提交到K8s叢集;K8s叢集會自動根據配置啟動相應的Pod服務,最後Flink程式開始執行。
session模式示例
- (1)K8s叢集根據提交的配置檔案啟動
K8sMaster
和TaskManager
(K8s Pod
物件) - (2)依次啟動Flink的
JobManager
、JobMaster
、Deploy
和K8sRM
程序(K8s Pod物件);過程中完成slots請求
和ExecutionGraph
的生成動作。 - (3)TaskManager註冊Slots、JobManager請求Slots並分配任務
- (4)部署Task執行並反饋狀態
6 Flink的執行圖有哪幾種?分別有什麼作用
Flink中的執行圖一般是可以分為四類,按照生成順序分別為:StreamGraph
-> JobGraph
-> ExecutionGraph
->物理執行圖
。
1)StreamGraph
顧名思義,這裡代表的是我們編寫的流程式圖。通過Stream API生成,這是執行圖的最原始拓撲資料結構。
2)JobGraph
StreamGraph在Client中經過運算元chain鏈合併等優化,轉換為JobGraph拓撲圖,隨後被提交到JobManager中。
3)ExecutionGraph
JobManager中將JobGraph進一步轉換為ExecutionGraph
,此時ExecutuonGraph根據運算元配置的並行度轉變為並行化的Graph拓撲結構。
4)物理執行圖
比較偏物理執行概念,即JobManager進行Job排程,TaskManager最終部署Task的圖結構。
7 說說Flink的視窗機制
Flink一般根據固定時間或長度把資料流切分到不同的視窗,並提供相應的視窗Window運算元,在視窗內進行聚合運算。
Flink的視窗一般分為三種類型:滾動視窗、滑動視窗、會話視窗和全域性視窗等。
滾動視窗
滑動視窗
會話視窗
Flink中的視窗運算元一般會配置Keyed型別資料集操作,並結合watermark和定時器,提供時間語義的統計,Windows運算元的定義如下:
- Windows Assigner:定義視窗的型別(資料流分配到多長時間間隔的哪種視窗),比如1min的滾動視窗。
- Trigger:指派觸發器,即視窗滿足什麼條件觸發
- Evictor:資料剔除(非必須)
- Lateness:是否處理延遲資料標誌,可在watermark之後再次觸發
- OutputTag:側輸出流輸出標籤,和getOutputTag配合使用。
- WindowFunction:windows內的處理邏輯(程式核心)
// 計算過去30s視窗的uv/pv
dataStream.keyBy(x => x.getString("position_id"))
.window(TumblingEventTimeWindows.of(Time.minutes(30)))
.aggregate(new PVResultFunc(), new UVResultFunc())
8 Flink的watermark水印瞭解嗎
Flink中的waternark(水印)是處理延遲資料的優化機制。一般資料順序進入系統,但是存在網路等外部因素導致資料亂序或者延遲達到,這部分資料即不能丟棄也不能無限等待,watermark的出現解決了這個兩難問題。
watermark的定義是:比如在一個視窗內,當位於視窗最小watermark
(水位線)的資料達到後,表明(約定)該視窗內的所有資料均已達到,此時不再等待資料,直接觸發視窗計算。
watermark:最新事件事件 - 固定時間間隔
1)watermark的作用
- 規定了資料延遲處理的最優判定,即watermark時間間隔
- 較為完善的處理了資料亂序的問題,從而輸出預期結果
- 結合最大延遲時間和側輸出流等機制,徹底解決資料延遲
2)watermark的生成
Flink中的watermark生成形式分為兩種,即PeriodicWatermarks(週期性的生成水印)、PunctuatedWatermarks(每條資訊/資料量生成水印)。
- AssignerWithPeriodicWatermarks
``` // 設定5s週期性生成watermark env.getConfig.setAutoWatermarkInterval(5000)
// 週期性生成watermark val periodicWatermarkStream = dataStream.assignTimestampsAndWatermarks(new XXPeriodicAssigner(10)) ```
- AssignerWithPunctuatedWatermarks
``` class xxx extends AssignerWithPunctuatedWatermarks[(String, Long, Int)] { override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { element._2 }
override def checkAndGetNextWatermark(lastElement:(St ring, Long, Int), extractTimestamp: Long): Watermark = { // 判斷欄位狀態生成watermark if (lastElement._1 != 0) new Watermark(extractTimesta mp) else null } } ```
9 Flink分散式快照原理是什麼
分散式快照即所謂的一致性檢查點
(Checkpoints)。定義為某個時間點上所有任務狀態的一份拷貝(快照),該時間點也是所有任務剛好處理完一個相同資料的時間。
Flink間隔時間自動執行
一致性檢查點
程式,非同步插入barrier檢查點分界線,記憶體狀態儲存為cp程序檔案。
- 從
source
(Input)端開始,JobManager
會向每個source端傳送檢查點barrier訊息
並啟動檢查點checkpoints
。在保證所有的source端資料處理完成後,Flink開始儲存一致性檢查點checkpoints
,過程啟用barrier檢查點分界線。 - 接收資料和
barrier
訊息,兩個過程非同步進行。在所有的source資料都處理完成後,開始將自己的檢查點checkpoints儲存到狀態後端StateBackend
中,並通知JobManager將barrier分發到下游。 - barrier向下遊傳遞時會進行
barrier對齊
確認。待barrier都到齊後才進行checkpoints檢查點儲存。 - 重複以上操作,直到整個流程完成。
10 說說Flink的狀態機制
Flink重要的特性就是其支援有狀態計算。什麼是有狀態計算呢?即將中間的計算結果進行儲存,便於後面的資料回溯和計算。
這個很好理解,因為Flink一般使用場景大多數為視窗實時計算,計算的是即時資料,當存在一個計算曆史資料累計的需求時顯得捉襟見肘,因此需要有方法能夠保持前面的資料狀態。Flink的底層很多機制預設開啟了狀態管理,比如checkpoint過程、二階段提交均存在狀態儲存的操作。
在實際操作中Flink狀態分為Keyed State 與 Operator State。
1)Operator State
運算元狀態的作用範圍限定為運算元任務,同一並行任務的所有資料都可以訪問到相同的狀態。狀態對於同一任務而言是共享的。
- List State。列表狀態運算元,將狀態儲存為列表資料
- Union List State。聯合列表狀態運算元,與List State類似,但是當出現故障時可恢復。
- Broadcast State。廣播狀態運算元,即存在多個task任務共享狀態。
``` private var listState : ListState[Person] = _
override def open(parameters: Configuration): Unit = { val listStateDesc: ListStateDescriptor[Person] = new ListStateDescriptorPerson listState = getRuntimeContext.getListState(listStateDesc) } ```
2)Keyed State
顧名思義,此型別的State狀態儲存形式為K-V鍵值對,通過K值管理和維護狀態資料。
Flink對每個key維護自身狀態,相同Key的資料劃分到同一任務中,由Key管理其對應的狀態。
- Value State。值狀態運算元,將狀態儲存為K-單個值
- List State。和上面的List State類似,狀態被儲存為k-陣列列表
- Map State。對映狀態運算元,狀態被儲存為K-Map
- 聚合State。狀態儲存為Aggregating聚合操作列表
```
MapState
userMapState = getRuntimeContext().getMapState(
new MapStateDescriptor
11 說說Flink的記憶體管理是如何做的
在介紹記憶體管理之前,先介紹一下JVM中的堆記憶體和堆外記憶體。
通常來說。JVM堆
空間概念,簡單描述就是在程式中,關於物件例項|陣列的建立
、使用
和釋放
的記憶體,都會在JVM中的一塊被稱作為"JVM堆"記憶體區域內進行管理分配。
Flink程式在建立物件後,JVM會在堆內記憶體中
分配
一定大小的空間,建立Class物件
並返回物件引用,Flink儲存物件引用,同時記錄佔用的記憶體資訊。
而堆外記憶體如果你有過Java相關程式設計經歷的話,相信對堆外記憶體的使用並不陌生。其底層呼叫基於C
的JDK Unsafe類方法,通過指標
直接進行記憶體的操作,包括記憶體空間的申請、使用、刪除釋放等。
介紹完了堆內記憶體和堆外記憶體的概念,下面我們來看下Flink的記憶體管理。
1)JobManager記憶體管理
JobManager程序總記憶體包括JVM堆內記憶體、JVM堆外記憶體以及JVM MetaData記憶體,其中涉及的記憶體配置引數為:
```
JobManager總程序記憶體
jobmanager.memory.process.size:
作業管理器的 JVM 堆記憶體大小
jobmanager.memory.heap.size:
作業管理器的堆外記憶體大小。此選項涵蓋所有堆外記憶體使用。
jobmanager.memory.off-heap.size: ```
2)TaskManager記憶體管理
TaskManager記憶體同樣包含JVM堆內記憶體、JVM堆外記憶體以及JVM MetaData記憶體三大塊。其中JVM堆內記憶體又包含Framework Heap和Task Heap,即框架堆記憶體和任務Task堆記憶體。
JVM堆外記憶體包含Memory memory託管記憶體,主要用於儲存排序、結果快取、狀態後端資料等。另一塊為Direct Memory直接記憶體,包含如下:
- Framework Off-Heap Memory:Flink框架的堆外記憶體,即Flink中TaskManager的自身記憶體,和slot無關。
- Task Off-Heap:Task的堆外記憶體
- Network Memory:網路記憶體
其中涉及的記憶體配置引數為:
``` // tm的框架堆內記憶體 taskmanager.memory.framework.heap.size=
// tm的任務堆內記憶體 taskmanager.memory.task.heap.size
// Flink管理的原生託管記憶體 taskmanager.memory.managed.size= taskmanager.memory.managed.fraction=
// Flink 框架堆外記憶體 taskmanager.memory.framework.off-heap.size=
// Task 堆外記憶體 taskmanager.memory.task.off-heap.size=
// 網路資料交換所使用的堆外記憶體大小 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb taskmanager.memory.network.fraction: 0.1 ```
12 Flink和Spark Streaming有什麼區別
1)設計理念
- Spark是批處理框架,其中的SparkStreaming在Spark的基礎上實現的微批處理工作,支援秒級別延遲。
- Flink是徹底的流處理框架,可以處理有界流和無流資料,達到流批一體,延遲低,真正做到來一條資料立馬處理。
- spark本身是無狀態的,基於RDD計算。Flink基於事件驅動,既能進行有狀態計算,也可以進行無狀態計算。
2)流批一體
- Spark通過逼近最小微批的方式達到近實時的效果,本質上還是批處理。
- Flink本身內部就是處理無界的實時流,通過時間間隔限制,將無界流轉換為有界流,實現流批一體。
3)應用場景
- Spark擅長處理資料量非常大而且邏輯複雜的批資料處理、基於歷史資料的互動式查詢等
- Flink擅長處理低延遲實時資料處理場景,比如實時日誌報表分析等。
- Spark社群更為活躍,且生態比較豐富,特別是機器學習方面;Flink正在逐漸完善社群和生態影響力。
4)相同點
- 均提供統一的批處理和流處理API,支援高階程式語言和SQL
- 都基於記憶體計算,速度快
- 都支援Exactly-once一致性
- 都有完善的故障恢復機制
二 進階篇
13 Flink/Spark/Hive SQL的執行原理
這裡我把三個元件SQL執行原理放到了一起,通過對比加深一下印象。
1)Hive SQL的執行原理
Hive SQL是Hive提供的SQL查詢引擎,底層由MapReduce實現。Hive根據輸入的SQL語句執行詞法分析、語法樹構建、編譯、邏輯計劃、優化邏輯計劃以及物理計劃等過程,轉化為Map Task和Reduce Task最終交由Mapreduce
引擎執行。
- 執行引擎。具有mapreduce的一切特性,適合大批量資料離線處理,相較於Spark而言,速度較慢且IO操作頻繁
- 有完整的
hql
語法,支援基本sql語法、函式和udf - 對錶資料儲存格式有要求,不同儲存、壓縮格式效能不同
2)Spark SQL的執行原理
Spark SQL底層基於Spark
引擎,使用Antlr
解析語法,編譯生成邏輯計劃和物理計劃,過程和Hive SQL執行過程類似,只不過Spark SQL產生的物理計劃為Spark程式。
- 輸入編寫的Spark SQL
SqlParser
分析器。進行語法檢查、詞義分析,生成未繫結的Logical Plan邏輯計劃(未繫結查詢資料的元資料資訊,比如查詢什麼檔案,查詢那些列等)Analyzer
解析器。查詢元資料資訊並繫結,生成完整的邏輯計劃。此時可以知道具體的資料位置和物件,Logical Plan 形如from table -> filter column -> select 形式的樹結構Optimizer
優化器。選擇最好的一個Logical Plan,並優化其中的不合理的地方。常見的例如謂詞下推、剪枝、合併等優化操作Planner
使用Planing Strategies將邏輯計劃轉化為物理計劃,並根據最佳策略選擇出的物理計劃作為最終的執行計劃- 呼叫Spark Plan
Execution
執行引擎執行Spark RDD任務
3)Flink SQL的執行原理
Flink SQL的執行原理和Hive以及Spark SQL的執行原理大同小異,均存在解析、校驗、編譯生成語法樹、優化生成邏輯計劃等步驟。
- Parser:
SQL解析
。底層通過JavaCC解析SQ語法,並將SQL解析為未經校驗的AST語法樹。 - Validate:
SQL校驗
。這裡會校驗SQL的合法性,比如Schema、欄位、資料型別等是否合法(SQL匹配程度),過程需要與sql儲存的元資料結合查驗。 - Optimize:
SQL優化
。Flink內部使用多種優化器,將前面步驟的語法樹進一步優化,針對RelNode和生成的邏輯計劃,隨後生成物理執行計劃。 - Produce:
SQL生成
。將物理執行計劃生成在特定平臺的可執行程式。 - Execute:
SQL執行
。執行SQL得到結果。
14 Flink的背壓遇到過嗎?怎麼解決的
Flink背壓是生產應用中常見的情況,當程式存在資料傾斜、記憶體不足狀況經常會發生背壓,我將從如下幾個方面去分析。
1)Flink背壓表現
- 1)執行開始時正常,後面出現大量Task任務
等待
- 2)少量Task任務開始報
checkpoint
超時問題 - 3)大量Kafka資料堆積,無法消費
- 4)Flink UI的BackPressure頁面出現紅色
High
標識
2) 反壓一般有哪些情況
一般可以細分兩種情況:
當前Task
任務處理速度慢,比如task任務中呼叫演算法處理等複雜邏輯,導致上游申請不到足夠記憶體。下游Task
任務處理速度慢,比如多次collect()輸出到下游,導致當前節點無法申請足夠的記憶體。
3) 頻繁反壓的影響是什麼
頻繁反壓會導致流處理作業資料延遲增加,同時還會影響到Checkpoint
。
Checkpoint時需要進行Barrier
對齊,此時若某個Task出現反壓
,Barrier流動速度會下降,導致Checkpoint變慢甚至超時,任務整體也變慢。
長期或頻繁出現反壓才需要處理,如果由於
網路波動
或者GC
出現的偶爾反壓可以不必處理。
4)Flink的反壓機制
背壓時一般下游速度慢於上游速度,資料久積成疾
,需要做限流。但是無法提前預估下游實際速度,且存在網路波動情況。
需要保持上下游動態反饋,如果下游速度慢,則上游限速;否則上游提速。實現動態自動反壓的效果。
下面看下Flink內部是怎麼實現反壓機制的。
- 1)每個
TaskManager
維護共享Network BufferPool
(Task共享記憶體池),初始化時向Off-heap Memory
中申請記憶體。 - 2)每個Task建立自身的
Local BufferPool
(Task本地記憶體池),並和Network BufferPool交換記憶體。 - 3)上游
Record Writer
向 Local BufferPool申請buffer(記憶體)寫資料。如果Local BufferPool沒有足夠記憶體則向Network BufferPool
申請,使用完之後將申請的記憶體返回Pool
。 - 4)
Netty Buffer
拷貝buffer並經過Socket Buffer
傳送到網路,後續下游端按照相似機制處理。 - 5)當下遊申請buffer失敗時,表示當前節點
記憶體
不夠,則逐層傳送反壓訊號
給上游,上游慢慢停止資料傳送,直到下游再次恢復。
5)反壓如何處理
- 檢視Flink UI介面,定位哪些Task出現反壓問題
- 檢視程式碼和資料,檢查是否出現數據傾斜
- 如果發生資料傾斜,進行預聚合key或拆分資料
- 加大執行記憶體,調整併發度和分割槽數
- 其他方式。。。
由於篇幅有限,更多Flink反壓內容請檢視我的相關文章:萬字趣解Flink背壓
15 Flink的exactly-once怎麼保持
精準一次消費需要整個系統各環節均保持強一致性,包括可靠的資料來源端
(資料可重複讀取、不丟失) 、可靠的消費端
(Flink)、可靠的輸出端
(冪等性、事務)。
Flink保持精準一次消費主要依靠checkpoint一致性快照
和二階段提交
機制。
1)資料來源端
Flink內建FlinkKafkaConsumer
類,不依賴於 kafka 內建的消費組offset管理,在內部自行記錄並維護
kafka consumer 的offset
。
(1)管理offset(手動提交)並儲存到checkpoint中\ (2)FlinkKafkaConsumer API內部整合Flink的Checkpoint機制,自動實現精確一次的處理語義。
從原始碼中看到stateBackend
中把offset state恢復到restoredState,然後從fetcher拉取最新的offset資料,隨後將offset存入到stateBackend中;最後更新xcheckpoint。
2)Flink消費端
Flink內部採用一致性快照機制
來保障Exactly-Once的一致性語義。
通過間隔時間自動執行一致性檢查點
(Checkpoints)程式,b並非同步插入barrier檢查點分界線。整個流程所有的operator均會進行barrier對齊
->資料完成確認
->checkpoints狀態儲存
,從而保證資料被精確一次處理。
3)輸出端
Flink內建二階段事務提交
機制和目標源支援冪等寫入。
冪等
寫入就是多次寫入會產生相同的結果,結果具有不可變性。在Flink中saveAsTextFile
運算元就是一種比較典型的冪等寫入。
二階段提交
則對於每個checkpoint建立事務,先預提交資料到sink中,然後等所有的checkpoint全部完成後再真正提交請求到sink, 並把狀態改為已確認,從而保證資料僅被處理一次。
為checkpoint建立事務,等到所有的checkpoint全部真正的完成後,才把計算結果寫入到sink中。
16 Flink怎麼處理遲到資料
- Flink內建watermark機制,可在一定程度上允許資料延遲
- 程式可在watermark的基礎上再配置最大延遲時間
- 開啟側輸出流,將延遲的資料輸出到側輸出流
- 程式內部控制,延遲過高的資料單獨進行後續處理
17 談談Flink的雙流JOIN
Flink雙流JOIN主要分為兩大類。一類是基於原生State的Connect運算元操作,另一類是基於視窗的JOIN操作。其中基於視窗的JOIN可細分為window join
和interval join
兩種。
實現原理:底層原理依賴Flink的
State狀態儲存
,通過將資料儲存到State中進行關聯join, 最終輸出結果。
1)基於Window Join的雙流JOIN實現機制
通俗理解,將兩條實時流中元素分配到同一個時間視窗中完成Join。兩條實時流資料快取在Window State
中,當視窗觸發計算時執行join操作。
- join運算元操作
兩條流資料按照關聯主鍵在(滾動、滑動、會話)視窗內進行inner join
, 底層基於State儲存,並支援處理時間和事件時間兩種時間特徵,看下原始碼:
windows視窗、state儲存和雙層for迴圈執行join()實現雙流JOIN操作,但是此時僅支援inner join型別。
- coGroup運算元操作
coGroup運算元也是基於window視窗機制,不過coGroup運算元比Join運算元更加靈活,可以按照使用者指定的邏輯匹配左流或右流資料並輸出,達到left join和right join的目的。
orderDetailStream
.coGroup(orderStream)
.where(r -> r.getOrderId())
.equalTo(r -> r.getOrderId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {
@Override
public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector) {
for (OrderDetail orderDetaill : orderDetailRecords) {
boolean flag = false;
for (Order orderRecord : orderRecords) {
// 右流中有對應的記錄
collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));
flag = true;
}
if (!flag) {
// 右流中沒有對應的記錄
collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));
}
}
}
})
.print();
2)基於Interval Join的雙流JOIN實現機制
Interval Join根據右流相對左流偏移的時間區間(interval
)作為關聯視窗,在偏移區間視窗中完成join操作。
滿足資料流stream2在資料流stream1的 interval
(low, high)偏移區間內關聯join。interval越大,關聯上的資料就越多,超出interval的資料不再關聯。
實現原理:interval join也是利用Flink的state儲存資料,不過此時存在state失效機制
ttl
,觸發資料清理操作。
val env = ...
// kafka 訂單流
val orderStream = ...
// kafka 訂單明細流
val orderDetailStream = ...
orderStream.keyBy(_.1)
// 呼叫intervalJoin關聯
.intervalJoin(orderDetailStream._2)
// 設定時間上限和下限
.between(Time.milliseconds(-30), Time.milliseconds(30))
.process(new ProcessWindowFunction())
class ProcessWindowFunction extends ProcessJoinFunction...{
override def processElement(...) {
collector.collect((r1, r2) => r1 + " : " + r2)
}
}
訂單流在流入程式後,等候(low,high)時間間隔內的訂單明細流資料進行join, 否則繼續處理下一個流。interval join目前也僅支援inner join。
3)基於Connect的雙流JOIN實現機制
對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以呼叫不同方法在兩個實時流上執行,且雙流之間可以共享狀態。
兩個資料流被connect之後,只是被放在了同一個流中,內部依然保持各自的資料和形式,兩個流相互獨立。
[DataStream1, DataStream2] -> ConnectedStreams[1,2]
我們可以在Connect運算元底層的ConnectedStreams中編寫程式碼,自行實現雙流JOIN的邏輯處理。
- 1)呼叫connect運算元,根據orderid進行分組,並使用process運算元分別對兩條流進行處理。
orderStream.connect(orderDetailStream)
.keyBy("orderId", "orderId")
.process(new orderProcessFunc());
- 2)process方法內部進行狀態程式設計, 初始化訂單、訂單明細和定時器的ValueState狀態。
```
private ValueState
// 初始化狀態Value
orderState = getRuntimeContext().getState(
new ValueStateDescriptor
- 3)為每個進入的資料流儲存state狀態並建立定時器。在時間視窗內另一個流達到時進行join並輸出,完成後刪除定時器。
@Override
public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){
if (orderDetailState.value() == null){
//明細資料未到,先把訂單資料放入狀態
orderState.update(value);
//建立定時器,60秒後觸發
Long ts = (value.getEventTime()+10)*1000L;
ctx.timerService().registerEventTimeTimer(
ts);
timeState.update(ts);
}else{
//明細資料已到,直接輸出到主流
out.collect(new Tuple2<>(value,orderDetailS
tate.value()));
//刪除定時器
ctx.timerService().deleteEventTimeTimer
(timeState.value());
//清空狀態,注意清空的是支付狀態
orderDetailState.clear();
timeState.clear();
}
}
...
@Override
public void processElement2(){
...
}
- 4)未及時達到的資料流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {
// 實現左連線
if (orderState.value() != null){
ctx.output(new OutputTag<String>("left-jo
in") {},
orderState.value().getTxId());
// 實現右連線
}else{
ctx.output(new OutputTag<String>("left-jo
in") {},
orderDetailState.value().getTxId());
}
orderState.clear();
orderDetailState.clear();
timeState.clear();
}
4)Flink雙流JOIN問題處理總結
- 1)為什麼我的雙流join時間到了卻不觸發,一直沒有輸出
檢查一下
watermark
的設定是否合理,資料時間
是否遠遠大於watermark和視窗時間,導致視窗資料經常為空
- 2)state資料儲存多久,會記憶體爆炸嗎
state自帶有
ttl機制
,可以設定ttl過期策略,觸發Flink清理過期state資料。建議程式中的state資料結構
用完後手動clear掉。
- 3)我的雙流join傾斜怎麼辦
join傾斜三板斧: 過濾異常key、拆分表減少資料、打散key分佈。當然可以的話我建議加記憶體!加記憶體!加記憶體!!
- 4)想實現多流join怎麼辦
目前無法一次實現,可以考慮先union然後再二次處理;或者先進行connnect操作再進行join操作,僅建議~
- 5)join過程延遲、沒關聯上的資料會丟失嗎
這個一般來說不會,join過程可以使用側輸出流儲存延遲流;如果出現節點網路等異常,Flink checkpoint也可以保證資料不丟失。
由於篇幅有限,更多Flink雙流JOIN內容請檢視我的相關文章:萬字直通Flink雙流JOIN面試
18 Flink資料傾斜遇到過嗎?怎麼處理的
資料傾斜一般都是資料Key分配不均,比如某一型別key數量過多,導致shuffle過程分到某節點資料量過大,記憶體無法支撐。
1)資料傾斜可能的情況
那我們怎麼發現數據傾斜了呢?一般是監控某任務Job執行情況,可以去Yarn UI或者Flink UI觀察,一般會出現如下狀況:
- 發現某subTask執行時間過慢
- 傳輸資料量和其他task相差過大
- BackPressure頁面出現反壓問題(紅色High標識)
結合以上的排查定位到具體的task中執行的運算元,一般常見於Keyed型別運算元:比如groupBy()、rebance()等產生shuffle過程的操作。
2)資料傾斜的處理方法
- 資料拆分。如果能定位的資料傾斜的key,總結其規律特徵。比如發現包含某字元,則可以在程式碼中把該部分資料key拆分出來,單獨處理後拼接。
- key二次聚合。兩次聚合,第一次將key加字首聚合,分散單點壓力;隨後去除字首後再次聚合,得到最終結果。
- 調整引數。加大TaskManager記憶體、keyby均衡等引數,一般效果不是很好。
- 自定義分割槽或聚合邏輯。繼承分區劃分、聚合計算介面,根據資料特徵和自定義邏輯,調整資料分割槽並均勻打散資料key。
19 Flink資料重複怎麼辦
一般來說Flink可以開啟exactly-once機制,可保證精準一次消費。但是如果存在資料處理過程異常導致資料重複,可以藉助一些工具或者程式來處理。
建議資料量不大的話可以使用flink自身的state或者藉助bitmap結構;稍微大點可以用布隆過濾器或hyperlog工具;其次使用外部介質(redis或hbase)設計好key就行自動去重,只不過會增加處理過程。
總結一下Flink的去重方式:
- 記憶體去重。採用
Hashset
等資料結構,讀取資料中類似主鍵等唯一性標識欄位,在記憶體中儲存並進行去重判斷。 - 使用
Redis Key
去重。藉助Redis的Hset
等特殊資料型別,自動完成Key去重。 - DataFrame/SQL場景,使用
group by
、over()
、window
開窗等SQL函式去重 - 利用groupByKey等聚合運算元去重
20 聊聊公司的Flink實時數倉架構,為什麼這麼設計
實時數倉資料規整為層級儲存,每層獨立加工。整體遵循由下向上建設思想,最大化資料賦能。
1)數倉分層設計
-
資料來源: 分為
日誌資料
和業務資料
兩大類,包括結構化和非結構化資料。 -
數倉型別:根據及時性分為
離線
數倉和實時
數倉 -
技術棧:
- 採集(Sqoop、Flume、CDC)
- 儲存(Hive、Hbase、Mysql、Kafka、資料湖)
- 加工(Hive、Spark、Flink)
- OLAP查詢(Kylin、Clickhous、ES、Dorisdb)等。
2)數倉架構設計
整體採用Lambda架構。保留實時、離線兩條處理流程,即最終會同時構建實時數倉和離線數倉。
1. 技術實現
- 使用Flink和Kafka、Hive為主要技術棧
- 實時技術流程。通過實時採集程式同步資料到Kafka訊息佇列
- Flink實時讀取Kafka資料,回寫到
kafka ods
貼源層topic - Flink實時讀取Kafka的ods層資料,進行實時清洗和加工,結果寫入到
kafka dwd
明細層topic - 同樣的步驟,Flink讀取dwd層資料寫入到
kafka dws
彙總層topic - 離線技術流程和前面章節一致
- 實時olap引擎查詢分析、報表展示
2. 優缺點
- 兩套技術流程,全面保障實時性和歷史資料完整性
- 同時維護兩套技術架構,維護成本高,技術難度大
- 相同資料來源處理兩次且儲存兩次,產生大量資料冗餘和操作重複
- 容易產生資料不一致問題
3)資料流程設計
整體從上而下,資料經過採集
-> 數倉明細加工
、彙總
-> 應用
步驟,提供實時數倉服務。
這裡列舉使用者分析的資料流程和技術路線:
採集使用者行為資料,統計使用者曝光點選資訊,構建使用者畫像。
電商實時數倉使用者分析資料流程
》》更多好文,請關注gzh:大資料兵工廠