2萬字50張圖玩轉Flink面試體系

語言: CN / TW / HK

大家好,我是老兵。

本系列為大資料技術棧面試體系系列,每期將分享一個技術元件的知識全體系,並結合面試的形式由淺入深講解。

本期將介紹大資料實時計算利器Flink面試體系,全文內容已製作成PDF。

一 基礎篇

1 簡單介紹下Flink及使用場景

Apache Flink是開源的大資料實時計算框架,具有分散式、高效能、記憶體計算等特點。Flink因其獨特的流批一體設計模式,被廣泛應用於實時離線資料應用場景。

Flink被稱為第四代大資料計算引擎,在其前面存在Mapreduce、Storm、Spark等計算框架。在流處理領域中,Flink是目前最全面、最強大的實時計算引擎。

結合官網的示意圖,我們來看下Flink的工作場景。

  • 資料來源:支援多種資料來源接入。包含事務型資料庫、日誌、IOT裝置、點選事件等資料。
  • 處理層:基於Yarn|K8s排程引擎和HDFS|S3儲存元件,提供完整的事件驅動時間語義流&批一體的Flink計算服務。
  • 應用層:輸出端提供應用系統、事件日誌、儲存系統等資料對接。

2 Flink程式設計模型瞭解嗎

1)Flink分層模型

Flink底層通過封裝和抽象,提供四級分層程式設計模型,以此支撐業務開發實時和批處理程式。

結合示意圖,我們由下而上進行介紹。

  • Runtime層: Flink程式的最底層入口。提供基礎的核心介面完成流、狀態、事件、時間等複雜操作,功能靈活但使用成本較高,一般面向原始碼研發人員。
  • DataStream/Dataset API層:這一層主要面向開發者。基於Runtime層抽象為兩類API,其中DataStream API處理實時流程式;Dataset API處理批資料程式。
  • Table API:統一DataStream/DataSet API,抽象成帶有Schema資訊的表結構API。通過Table操作和登錄檔完成資料計算,支援與DataStream/Dataset相互轉換。
  • SQL:面向資料分析和開發人員,抽象為SQL操作,降低開發門檻和平臺化。

2)Flink計算模型

Flink的計算模型和Spark的模型有些類似。包含輸入端(source)、轉換(Transform)、輸出端(sink)。

  • source端:Flink程式的輸入端,支援多個數據源對接
  • Transformation:Flink程式的轉換過程,實現DataStream/Dataset的計算和轉換
  • sink端: Flink的輸出端,支援內部和外部輸出源

具體的Flink計算模型(運算元)詳情,可以參考我的文章:一網打盡Flink運算元大全

3 聊聊Flink的工作原理

主要考察對Flink的內部執行機制的瞭解程度,需要重點注意Flink中的重要角色元件及其協作機制。

Flink底層執行分為客戶端(Client)、Job管理器(JobManager)、任務執行器(TaskManager)三種角色元件。其中Client負責Job提交;JobManager負責協調Job執行和Task任務分配;TaskManager負責Task任務執行。

Flink基礎工作原理:Standalone模式 Flink常見執行流程如下(排程器不同會有所區別):

  • 1)使用者提交流程式Application。
  • 2)Flink解析StreamGraphOptimizer和Builder模組解析程式程式碼,生成初始StreamGraph並提交至Client
  • 3)Client生成JobGraph。上述StreamGraph由一系列operator chain構成,在client中會被轉換為JobGraph,即優化多個chain為一個節點,最終提交到JobManager
  • 4)JobManager排程Job。JobManager和Client的ActorSystem保持通訊,並生成ExecutionGraph(並行化JobGraph)。隨後Schduler和Coordinator模組協調並排程Jobz執行。
  • 5)TaskManager部署TaskTaskManagerJobManagerActorSystem保持通訊,接受job排程計劃並在內部劃分TaskSlot部署執行Task任務
  • 6)Task執行。Task執行期間,JobManager、TaskManager和Client之間保持通訊,回傳任務狀態心跳資訊,監控任務執行。

4 公司怎麼提交Flink實時任務的?談談流程

顧名思義,這裡涉及Flink的部署模式內容。一般Flink部署模式除了Standalone之外,最常見的為Flink on Yarn和Flink on K8s模式,其中Flink on Yarn模式在企業中應用最廣。

Flink on Yarn模式細分由可以分為Flink session、Flink per-job和Flink application模式,下面我們逐一說明。

1)Flink session模式

Flink Session模式會首先啟動一個叢集,按照配置約定,叢集中包含一定數量的JobManagerTaskManager。後面所有提交的Flink Job均共享該叢集內的JobManager和TaskManager,即所有的Flink Job競爭相同資源。

這樣做的好處是節省作業提交資源開銷(叢集已存在),減少資源和執行緒切換工作。但是所有作業共享一個JobManager,導致JobManager壓力激增,同時一旦某Job發生故障時會影響到其他作業(中斷或重啟)。一般僅適用於短週期小容量作業。

看下Flink-session模式的作業提交流程:

  • (1)整體流程分為兩部分:yarn-session叢集啟動、Job提交。
  • (2)yarn-session叢集啟動。請求YarnRM啟動JobManager,隨後JobManager內部啟動DispatcherFlink-yarnRM程序,等待後續Job提交
  • (3)Client提交Job。Client連線Dispatcher開始提交Job,包含jars和解析過的JobGraph拓撲資料結構。
  • (4)Dispatcher啟動JobMaster,JobMaster向Yarn RM請求slots資源
  • (5)Flink-Yarn RM向Yarn RM請求Container資源,準備啟動TaskManager。
  • (6)Yarn啟動TaskManager程序。TaskManager同時向Flink RM反向註冊(自身可用的slots槽數)
  • (7)TaskManager為新的作業提供slots,與JobMaster通訊。
  • (8)JobMaster將執行的任務分發給TaskManager,開始部署執行任務

2)Flink Per-job模式

Flink Per-job模式為每個提交的作業啟動叢集,各叢集間相互獨立,並在各自作業完成後銷燬,最大限度保障資源隔離。每個Job均衡分發自身的JobManager,單獨進行job的排程和執行。

雖然該模式提供了資源隔離,但是每個job均維護一個叢集,啟動、銷燬以及資源請求消耗時間長,因此比較適用於長時間的任務執行(批處理任務)。

Per-job模式在Flink 1.15中棄用,目前推薦使用applicaiton模式。

看下Flink Per-job模式的作業提交流程:

  • (1)首先Client提交作業到YarnRM,包括jars和JobGraph等資訊。
  • (2)YarnRM分配Container啟動AppMaster。AppMaster中啟動JobManagerFlinkRM,並將作業提交給JobMaster
  • (3)JobMaster向YarnRM請求資源(slots)。
  • (4)FlinkRM向YarnRM請求container並啟動TaskManager
  • (6)TaskManager啟動之後,向FlinkRM註冊自己的可用任務槽。
  • (7)TaskManager向FlinkRM反向註冊(自身可用的slots槽數
  • (8)TaskManager為新的作業提供slots,與JobMaster通訊。
  • (9)JobMaster將執行的任務分發給TaskManager,開始部署執行任務

3)Flink application模式

Flink application模式綜合Per-job和session的優點,為每個·Application·建立獨立的叢集(JobManager),允許每個Application中包含多個job作業提交(可開啟非同步提交),當application應用完成時叢集關閉。

該模式和前面兩種模式的最大區別是Main()方法此時在JobManager中執行,即在JobManager中完成檔案下載jobGraph解析提交資源等事項。前面兩種模式的main()方法在Client端執行,該模式將大大減少Client壓力。

看下Flink application模式的作業提交流程:

  • (1)流程與Per-job模式的提交流程非常相似。
  • (2)提交Application。此時首先是提交整個Application應用,應用中包含多個Job。
  • (3)每個Job啟動各自的JobManager,可選擇非同步啟動執行。
  • (4)其餘步驟與Per-job模式類似,可參考上述步驟詳解。

5 K8s瞭解嗎?談談Flink on K8S的提交流程

由於目前雲原生和K8s容器化的快速發展,很多Flink程式開始轉向容器化部署。首先需要了解下K8s的相關知識,這是個加分項。

1)K8s容器編排技術

k8s全稱kubernete,是一種強大的可移植的高效能容器編排工具。這裡的容器指的是Docker容器化技術,它通過將執行環境和配置打包成映象服務,在任意環境下快速部署docker容器,提供基礎的環境服務。解決之前部署服務速度慢、遷移難和高成本等問題。

由於Docker容器技術的普及,基於容器構建的雲原生架構越來越多,同時也帶來了很多容器運維管理問題。K8s提供了一套完整的容器編排解決方案,實現容器發現及排程負載均衡彈性擴容資料卷掛載等服務。

2)Flink on K8s部署模式

整體過程和Flink on Yarn的提交模式比較類似,主要是環境切換成K8s,此時的TaskManager和JobManager等元件變成了K8s Pod角色(映象)。

首先提前定義各元件的服務配置檔案並提交到K8s叢集;K8s叢集會自動根據配置啟動相應的Pod服務,最後Flink程式開始執行。

session模式示例

session模式示例

  • (1)K8s叢集根據提交的配置檔案啟動K8sMasterTaskManagerK8s Pod物件)
  • (2)依次啟動Flink的JobManagerJobMasterDeployK8sRM程序(K8s Pod物件);過程中完成slots請求ExecutionGraph的生成動作。
  • (3)TaskManager註冊Slots、JobManager請求Slots並分配任務
  • (4)部署Task執行並反饋狀態

6 Flink的執行圖有哪幾種?分別有什麼作用

Flink中的執行圖一般是可以分為四類,按照生成順序分別為:StreamGraph-> JobGraph-> ExecutionGraph->物理執行圖

1)StreamGraph

顧名思義,這裡代表的是我們編寫的流程式圖。通過Stream API生成,這是執行圖的最原始拓撲資料結構。

2)JobGraph

StreamGraph在Client中經過運算元chain鏈合併等優化,轉換為JobGraph拓撲圖,隨後被提交到JobManager中。

3)ExecutionGraph

JobManager中將JobGraph進一步轉換為ExecutionGraph,此時ExecutuonGraph根據運算元配置的並行度轉變為並行化的Graph拓撲結構。

4)物理執行圖

比較偏物理執行概念,即JobManager進行Job排程,TaskManager最終部署Task的圖結構。

7 說說Flink的視窗機制

Flink一般根據固定時間或長度把資料流切分到不同的視窗,並提供相應的視窗Window運算元,在視窗內進行聚合運算。

Flink的視窗一般分為三種類型:滾動視窗、滑動視窗、會話視窗和全域性視窗等。

滾動視窗

滾動視窗

滑動視窗

滑動視窗

會話視窗

會話視窗

Flink中的視窗運算元一般會配置Keyed型別資料集操作,並結合watermark和定時器,提供時間語義的統計,Windows運算元的定義如下:

  • Windows Assigner:定義視窗的型別(資料流分配到多長時間間隔的哪種視窗),比如1min的滾動視窗。
  • Trigger:指派觸發器,即視窗滿足什麼條件觸發
  • Evictor:資料剔除(非必須)
  • Lateness:是否處理延遲資料標誌,可在watermark之後再次觸發
  • OutputTag:側輸出流輸出標籤,和getOutputTag配合使用。
  • WindowFunction:windows內的處理邏輯(程式核心)

// 計算過去30s視窗的uv/pv dataStream.keyBy(x => x.getString("position_id"))    .window(TumblingEventTimeWindows.of(Time.minutes(30)))   .aggregate(new PVResultFunc(), new UVResultFunc())

8 Flink的watermark水印瞭解嗎

Flink中的waternark(水印)是處理延遲資料的優化機制。一般資料順序進入系統,但是存在網路等外部因素導致資料亂序或者延遲達到,這部分資料即不能丟棄也不能無限等待,watermark的出現解決了這個兩難問題。

watermark的定義是:比如在一個視窗內,當位於視窗最小watermark(水位線)的資料達到後,表明(約定)該視窗內的所有資料均已達到,此時不再等待資料,直接觸發視窗計算。

watermark:最新事件事件 - 固定時間間隔

1)watermark的作用

  • 規定了資料延遲處理的最優判定,即watermark時間間隔
  • 較為完善的處理了資料亂序的問題,從而輸出預期結果
  • 結合最大延遲時間和側輸出流等機制,徹底解決資料延遲

2)watermark的生成

Flink中的watermark生成形式分為兩種,即PeriodicWatermarks(週期性的生成水印)、PunctuatedWatermarks(每條資訊/資料量生成水印)。

  • AssignerWithPeriodicWatermarks

``` // 設定5s週期性生成watermark env.getConfig.setAutoWatermarkInterval(5000)

// 週期性生成watermark val periodicWatermarkStream = dataStream.assignTimestampsAndWatermarks(new XXPeriodicAssigner(10)) ```

  • AssignerWithPunctuatedWatermarks

``` class xxx extends AssignerWithPunctuatedWatermarks[(String, Long, Int)] {    override def extractTimestamp(element: (String, Long,     Int), previousElementTimestamp: Long): Long = {       element._2     }

override def checkAndGetNextWatermark(lastElement:(St  ring, Long, Int), extractTimestamp: Long): Watermark = {     // 判斷欄位狀態生成watermark        if (lastElement._1 != 0) new Watermark(extractTimesta  mp) else null      } } ```

9 Flink分散式快照原理是什麼

分散式快照即所謂的一致性檢查點(Checkpoints)。定義為某個時間點上所有任務狀態的一份拷貝(快照),該時間點也是所有任務剛好處理完一個相同資料的時間。

Flink間隔時間自動執行一致性檢查點程式,非同步插入barrier檢查點分界線,記憶體狀態儲存為cp程序檔案。

  • source(Input)端開始,JobManager會向每個source端傳送檢查點barrier訊息並啟動檢查點checkpoints。在保證所有的source端資料處理完成後,Flink開始儲存一致性檢查點checkpoints,過程啟用barrier檢查點分界線。
  • 接收資料和barrier訊息,兩個過程非同步進行。在所有的source資料都處理完成後,開始將自己的檢查點checkpoints儲存到狀態後端StateBackend中,並通知JobManager將barrier分發到下游。
  • barrier向下遊傳遞時會進行barrier對齊確認。待barrier都到齊後才進行checkpoints檢查點儲存。
  • 重複以上操作,直到整個流程完成。

10 說說Flink的狀態機制

Flink重要的特性就是其支援有狀態計算。什麼是有狀態計算呢?即將中間的計算結果進行儲存,便於後面的資料回溯和計算。

這個很好理解,因為Flink一般使用場景大多數為視窗實時計算,計算的是即時資料,當存在一個計算曆史資料累計的需求時顯得捉襟見肘,因此需要有方法能夠保持前面的資料狀態。Flink的底層很多機制預設開啟了狀態管理,比如checkpoint過程、二階段提交均存在狀態儲存的操作。

在實際操作中Flink狀態分為Keyed State 與 Operator State。

1)Operator State

運算元狀態的作用範圍限定為運算元任務,同一並行任務的所有資料都可以訪問到相同的狀態。狀態對於同一任務而言是共享的。

  • List State。列表狀態運算元,將狀態儲存為列表資料
  • Union List State。聯合列表狀態運算元,與List State類似,但是當出現故障時可恢復。
  • Broadcast State。廣播狀態運算元,即存在多個task任務共享狀態。

``` private var listState : ListState[Person] = _

override def open(parameters: Configuration): Unit = {     val listStateDesc: ListStateDescriptor[Person] = new ListStateDescriptorPerson     listState = getRuntimeContext.getListState(listStateDesc) } ```

2)Keyed State

顧名思義,此型別的State狀態儲存形式為K-V鍵值對,通過K值管理和維護狀態資料。

Flink對每個key維護自身狀態,相同Key的資料劃分到同一任務中,由Key管理其對應的狀態。

  • Value State。值狀態運算元,將狀態儲存為K-單個值
  • List State。和上面的List State類似,狀態被儲存為k-陣列列表
  • Map State。對映狀態運算元,狀態被儲存為K-Map
  • 聚合State。狀態儲存為Aggregating聚合操作列表

``` MapState userMapState; 

userMapState = getRuntimeContext().getMapState(     new MapStateDescriptor(     "Usercount",Long.class,Long.class)); ```

11 說說Flink的記憶體管理是如何做的

在介紹記憶體管理之前,先介紹一下JVM中的堆記憶體和堆外記憶體。

通常來說。JVM堆空間概念,簡單描述就是在程式中,關於物件例項|陣列的建立使用釋放的記憶體,都會在JVM中的一塊被稱作為"JVM堆"記憶體區域內進行管理分配。

Flink程式在建立物件後,JVM會在堆內記憶體中分配一定大小的空間,建立Class物件並返回物件引用,Flink儲存物件引用,同時記錄佔用的記憶體資訊。

而堆外記憶體如果你有過Java相關程式設計經歷的話,相信對堆外記憶體的使用並不陌生。其底層呼叫基於C的JDK Unsafe類方法,通過指標直接進行記憶體的操作,包括記憶體空間的申請、使用、刪除釋放等。

介紹完了堆內記憶體和堆外記憶體的概念,下面我們來看下Flink的記憶體管理。

1)JobManager記憶體管理

JobManager程序總記憶體包括JVM堆內記憶體、JVM堆外記憶體以及JVM MetaData記憶體,其中涉及的記憶體配置引數為:

```

JobManager總程序記憶體

jobmanager.memory.process.size:

作業管理器的 JVM 堆記憶體大小

jobmanager.memory.heap.size:

作業管理器的堆外記憶體大小。此選項涵蓋所有堆外記憶體使用。

jobmanager.memory.off-heap.size: ```

2)TaskManager記憶體管理

TaskManager記憶體同樣包含JVM堆內記憶體、JVM堆外記憶體以及JVM MetaData記憶體三大塊。其中JVM堆內記憶體又包含Framework Heap和Task Heap,即框架堆記憶體和任務Task堆記憶體。

JVM堆外記憶體包含Memory memory託管記憶體,主要用於儲存排序、結果快取、狀態後端資料等。另一塊為Direct Memory直接記憶體,包含如下:

  • Framework Off-Heap Memory:Flink框架的堆外記憶體,即Flink中TaskManager的自身記憶體,和slot無關。
  • Task Off-Heap:Task的堆外記憶體
  • Network Memory:網路記憶體

其中涉及的記憶體配置引數為:

``` // tm的框架堆內記憶體 taskmanager.memory.framework.heap.size=

// tm的任務堆內記憶體 taskmanager.memory.task.heap.size

// Flink管理的原生託管記憶體 taskmanager.memory.managed.size= taskmanager.memory.managed.fraction=

// Flink 框架堆外記憶體 taskmanager.memory.framework.off-heap.size=

// Task 堆外記憶體 taskmanager.memory.task.off-heap.size=

// 網路資料交換所使用的堆外記憶體大小 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb taskmanager.memory.network.fraction: 0.1 ```

12 Flink和Spark Streaming有什麼區別

1)設計理念

  • Spark是批處理框架,其中的SparkStreaming在Spark的基礎上實現的微批處理工作,支援秒級別延遲。
  • Flink是徹底的流處理框架,可以處理有界流和無流資料,達到流批一體,延遲低,真正做到來一條資料立馬處理。
  • spark本身是無狀態的,基於RDD計算。Flink基於事件驅動,既能進行有狀態計算,也可以進行無狀態計算。

2)流批一體

  • Spark通過逼近最小微批的方式達到近實時的效果,本質上還是批處理。
  • Flink本身內部就是處理無界的實時流,通過時間間隔限制,將無界流轉換為有界流,實現流批一體。

3)應用場景

  • Spark擅長處理資料量非常大而且邏輯複雜的批資料處理、基於歷史資料的互動式查詢等
  • Flink擅長處理低延遲實時資料處理場景,比如實時日誌報表分析等。
  • Spark社群更為活躍,且生態比較豐富,特別是機器學習方面;Flink正在逐漸完善社群和生態影響力。

4)相同點

  • 均提供統一的批處理和流處理API,支援高階程式語言和SQL
  • 都基於記憶體計算,速度快
  • 都支援Exactly-once一致性
  • 都有完善的故障恢復機制

二 進階篇

13 Flink/Spark/Hive SQL的執行原理

這裡我把三個元件SQL執行原理放到了一起,通過對比加深一下印象。

1)Hive SQL的執行原理

Hive SQL是Hive提供的SQL查詢引擎,底層由MapReduce實現。Hive根據輸入的SQL語句執行詞法分析、語法樹構建、編譯、邏輯計劃、優化邏輯計劃以及物理計劃等過程,轉化為Map Task和Reduce Task最終交由Mapreduce引擎執行。

  • 執行引擎。具有mapreduce的一切特性,適合大批量資料離線處理,相較於Spark而言,速度較慢且IO操作頻繁
  • 有完整的hql語法,支援基本sql語法、函式和udf
  • 對錶資料儲存格式有要求,不同儲存、壓縮格式效能不同

2)Spark SQL的執行原理

Spark SQL底層基於Spark引擎,使用Antlr解析語法,編譯生成邏輯計劃和物理計劃,過程和Hive SQL執行過程類似,只不過Spark SQL產生的物理計劃為Spark程式。

  • 輸入編寫的Spark SQL
  • SqlParser分析器。進行語法檢查、詞義分析,生成未繫結的Logical Plan邏輯計劃(未繫結查詢資料的元資料資訊,比如查詢什麼檔案,查詢那些列等)
  • Analyzer解析器。查詢元資料資訊並繫結,生成完整的邏輯計劃。此時可以知道具體的資料位置和物件,Logical Plan 形如from table -> filter column -> select 形式的樹結構
  • Optimizer優化器。選擇最好的一個Logical Plan,並優化其中的不合理的地方。常見的例如謂詞下推、剪枝、合併等優化操作
  • Planner使用Planing Strategies將邏輯計劃轉化為物理計劃,並根據最佳策略選擇出的物理計劃作為最終的執行計劃
  • 呼叫Spark Plan Execution執行引擎執行Spark RDD任務

3)Flink SQL的執行原理

Flink SQL的執行原理和Hive以及Spark SQL的執行原理大同小異,均存在解析、校驗、編譯生成語法樹、優化生成邏輯計劃等步驟。

  • Parser:SQL解析。底層通過JavaCC解析SQ語法,並將SQL解析為未經校驗的AST語法樹。
  • Validate:SQL校驗。這裡會校驗SQL的合法性,比如Schema、欄位、資料型別等是否合法(SQL匹配程度),過程需要與sql儲存的元資料結合查驗。
  • Optimize:SQL優化。Flink內部使用多種優化器,將前面步驟的語法樹進一步優化,針對RelNode和生成的邏輯計劃,隨後生成物理執行計劃。
  • Produce:SQL生成。將物理執行計劃生成在特定平臺的可執行程式。
  • Execute:SQL執行。執行SQL得到結果。

14 Flink的背壓遇到過嗎?怎麼解決的

Flink背壓是生產應用中常見的情況,當程式存在資料傾斜、記憶體不足狀況經常會發生背壓,我將從如下幾個方面去分析。

1)Flink背壓表現

  • 1)執行開始時正常,後面出現大量Task任務等待
  • 2)少量Task任務開始報checkpoint超時問題
  • 3)大量Kafka資料堆積,無法消費
  • 4)Flink UI的BackPressure頁面出現紅色High標識

2) 反壓一般有哪些情況

一般可以細分兩種情況:

  • 當前Task任務處理速度慢,比如task任務中呼叫演算法處理等複雜邏輯,導致上游申請不到足夠記憶體。
  • 下游Task任務處理速度慢,比如多次collect()輸出到下游,導致當前節點無法申請足夠的記憶體。

3) 頻繁反壓的影響是什麼

頻繁反壓會導致流處理作業資料延遲增加,同時還會影響到Checkpoint

Checkpoint時需要進行Barrier對齊,此時若某個Task出現反壓,Barrier流動速度會下降,導致Checkpoint變慢甚至超時,任務整體也變慢。

長期或頻繁出現反壓才需要處理,如果由於網路波動或者GC出現的偶爾反壓可以不必處理。

4)Flink的反壓機制

背壓時一般下游速度慢於上游速度,資料久積成疾,需要做限流。但是無法提前預估下游實際速度,且存在網路波動情況。

需要保持上下游動態反饋,如果下游速度慢,則上游限速;否則上游提速。實現動態自動反壓的效果。

下面看下Flink內部是怎麼實現反壓機制的。

  • 1)每個TaskManager維護共享Network BufferPool(Task共享記憶體池),初始化時向Off-heap Memory中申請記憶體。
  • 2)每個Task建立自身的Local BufferPool(Task本地記憶體池),並和Network BufferPool交換記憶體。
  • 3)上游Record Writer向 Local BufferPool申請buffer(記憶體)寫資料。如果Local BufferPool沒有足夠記憶體則向Network BufferPool申請,使用完之後將申請的記憶體返回Pool
  • 4)Netty Buffer拷貝buffer並經過Socket Buffer傳送到網路,後續下游端按照相似機制處理。
  • 5)當下遊申請buffer失敗時,表示當前節點記憶體不夠,則逐層傳送反壓訊號給上游,上游慢慢停止資料傳送,直到下游再次恢復。

5)反壓如何處理

  • 檢視Flink UI介面,定位哪些Task出現反壓問題
  • 檢視程式碼和資料,檢查是否出現數據傾斜
  • 如果發生資料傾斜,進行預聚合key或拆分資料
  • 加大執行記憶體,調整併發度和分割槽數
  • 其他方式。。。

由於篇幅有限,更多Flink反壓內容請檢視我的相關文章:萬字趣解Flink背壓

15 Flink的exactly-once怎麼保持

精準一次消費需要整個系統各環節均保持強一致性,包括可靠的資料來源端(資料可重複讀取、不丟失) 、可靠的消費端(Flink)、可靠的輸出端(冪等性、事務)。

Flink保持精準一次消費主要依靠checkpoint一致性快照二階段提交機制。

1)資料來源端

Flink內建FlinkKafkaConsumer類,不依賴於 kafka 內建的消費組offset管理,在內部自行記錄並維護 kafka consumer 的offset

(1)管理offset(手動提交)並儲存到checkpoint中\ (2)FlinkKafkaConsumer API內部整合Flink的Checkpoint機制,自動實現精確一次的處理語義。

從原始碼中看到stateBackend中把offset state恢復到restoredState,然後從fetcher拉取最新的offset資料,隨後將offset存入到stateBackend中;最後更新xcheckpoint。

2)Flink消費端

Flink內部採用一致性快照機制來保障Exactly-Once的一致性語義。

通過間隔時間自動執行一致性檢查點(Checkpoints)程式,b並非同步插入barrier檢查點分界線。整個流程所有的operator均會進行barrier對齊->資料完成確認->checkpoints狀態儲存,從而保證資料被精確一次處理。

3)輸出端

Flink內建二階段事務提交機制和目標源支援冪等寫入。

冪等寫入就是多次寫入會產生相同的結果,結果具有不可變性。在Flink中saveAsTextFile運算元就是一種比較典型的冪等寫入。

二階段提交則對於每個checkpoint建立事務,先預提交資料到sink中,然後等所有的checkpoint全部完成後再真正提交請求到sink, 並把狀態改為已確認,從而保證資料僅被處理一次。

為checkpoint建立事務,等到所有的checkpoint全部真正的完成後,才把計算結果寫入到sink中。

16 Flink怎麼處理遲到資料

  1. Flink內建watermark機制,可在一定程度上允許資料延遲
  2. 程式可在watermark的基礎上再配置最大延遲時間
  3. 開啟側輸出流,將延遲的資料輸出到側輸出流
  4. 程式內部控制,延遲過高的資料單獨進行後續處理

17 談談Flink的雙流JOIN

Flink雙流JOIN主要分為兩大類。一類是基於原生State的Connect運算元操作,另一類是基於視窗的JOIN操作。其中基於視窗的JOIN可細分為window joininterval join兩種。

實現原理:底層原理依賴Flink的State狀態儲存,通過將資料儲存到State中進行關聯join, 最終輸出結果。

1)基於Window Join的雙流JOIN實現機制

通俗理解,將兩條實時流中元素分配到同一個時間視窗中完成Join。兩條實時流資料快取在Window State中,當視窗觸發計算時執行join操作。

  • join運算元操作

兩條流資料按照關聯主鍵在(滾動、滑動、會話)視窗內進行inner join, 底層基於State儲存,並支援處理時間和事件時間兩種時間特徵,看下原始碼:

windows視窗、state儲存和雙層for迴圈執行join()實現雙流JOIN操作,但是此時僅支援inner join型別。

  • coGroup運算元操作

coGroup運算元也是基於window視窗機制,不過coGroup運算元比Join運算元更加靈活,可以按照使用者指定的邏輯匹配左流或右流資料並輸出,達到left join和right join的目的。

orderDetailStream   .coGroup(orderStream)   .where(r -> r.getOrderId())   .equalTo(r -> r.getOrderId())   .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))   .apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {     @Override     public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector)  {       for (OrderDetail orderDetaill : orderDetailRecords) {         boolean flag = false;         for (Order orderRecord : orderRecords) {           // 右流中有對應的記錄           collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));           flag = true;         }         if (!flag) {           // 右流中沒有對應的記錄           collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));         }       }     }   })   .print();

2)基於Interval Join的雙流JOIN實現機制

Interval Join根據右流相對左流偏移的時間區間(interval)作為關聯視窗,在偏移區間視窗中完成join操作。

滿足資料流stream2在資料流stream1的 interval(low, high)偏移區間內關聯join。interval越大,關聯上的資料就越多,超出interval的資料不再關聯。

實現原理:interval join也是利用Flink的state儲存資料,不過此時存在state失效機制ttl,觸發資料清理操作。

val env = ... // kafka 訂單流 val orderStream = ...  // kafka 訂單明細流 val orderDetailStream = ...      orderStream.keyBy(_.1)     // 呼叫intervalJoin關聯     .intervalJoin(orderDetailStream._2)     // 設定時間上限和下限     .between(Time.milliseconds(-30), Time.milliseconds(30))       .process(new ProcessWindowFunction())      class ProcessWindowFunction extends ProcessJoinFunction...{    override def processElement(...) {       collector.collect((r1, r2) => r1 + " : " + r2)    } }

訂單流在流入程式後,等候(low,high)時間間隔內的訂單明細流資料進行join, 否則繼續處理下一個流。interval join目前也僅支援inner join。

3)基於Connect的雙流JOIN實現機制

對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以呼叫不同方法在兩個實時流上執行,且雙流之間可以共享狀態。

兩個資料流被connect之後,只是被放在了同一個流中,內部依然保持各自的資料和形式,兩個流相互獨立。

[DataStream1, DataStream2] -> ConnectedStreams[1,2]

我們可以在Connect運算元底層的ConnectedStreams中編寫程式碼,自行實現雙流JOIN的邏輯處理。

  • 1)呼叫connect運算元,根據orderid進行分組,並使用process運算元分別對兩條流進行處理。

orderStream.connect(orderDetailStream)   .keyBy("orderId", "orderId")   .process(new orderProcessFunc());

  • 2)process方法內部進行狀態程式設計, 初始化訂單、訂單明細和定時器的ValueState狀態。

``` private ValueState orderState; private ValueState orderDetailState; private ValueState timeState;

// 初始化狀態Value orderState = getRuntimeContext().getState(  new ValueStateDescriptor  ("order-state",Order.class)); ···· ```

  • 3)為每個進入的資料流儲存state狀態並建立定時器。在時間視窗內另一個流達到時進行join並輸出,完成後刪除定時器。

@Override public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){   if (orderDetailState.value() == null){     //明細資料未到,先把訂單資料放入狀態      orderState.update(value);     //建立定時器,60秒後觸發      Long ts = (value.getEventTime()+10)*1000L;      ctx.timerService().registerEventTimeTimer(        ts);      timeState.update(ts);   }else{     //明細資料已到,直接輸出到主流      out.collect(new Tuple2<>(value,orderDetailS        tate.value()));     //刪除定時器      ctx.timerService().deleteEventTimeTimer       (timeState.value());      //清空狀態,注意清空的是支付狀態       orderDetailState.clear();       timeState.clear();   } } ... @Override public void processElement2(){   ... }

  • 4)未及時達到的資料流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。

@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {   // 實現左連線    if (orderState.value() != null){        ctx.output(new OutputTag<String>("left-jo         in") {},         orderState.value().getTxId());    // 實現右連線    }else{       ctx.output(new OutputTag<String>("left-jo         in") {},         orderDetailState.value().getTxId());    }    orderState.clear();    orderDetailState.clear();    timeState.clear(); }

4)Flink雙流JOIN問題處理總結

  • 1)為什麼我的雙流join時間到了卻不觸發,一直沒有輸出

檢查一下watermark的設定是否合理,資料時間是否遠遠大於watermark和視窗時間,導致視窗資料經常為空

  • 2)state資料儲存多久,會記憶體爆炸嗎

state自帶有ttl機制,可以設定ttl過期策略,觸發Flink清理過期state資料。建議程式中的state資料結構用完後手動clear掉。

  • 3)我的雙流join傾斜怎麼辦

join傾斜三板斧: 過濾異常key、拆分表減少資料、打散key分佈。當然可以的話我建議加記憶體!加記憶體!加記憶體!!

  • 4)想實現多流join怎麼辦

目前無法一次實現,可以考慮先union然後再二次處理;或者先進行connnect操作再進行join操作,僅建議~

  • 5)join過程延遲、沒關聯上的資料會丟失嗎

這個一般來說不會,join過程可以使用側輸出流儲存延遲流;如果出現節點網路等異常,Flink checkpoint也可以保證資料不丟失。

由於篇幅有限,更多Flink雙流JOIN內容請檢視我的相關文章:萬字直通Flink雙流JOIN面試

18 Flink資料傾斜遇到過嗎?怎麼處理的

資料傾斜一般都是資料Key分配不均,比如某一型別key數量過多,導致shuffle過程分到某節點資料量過大,記憶體無法支撐。

1)資料傾斜可能的情況

那我們怎麼發現數據傾斜了呢?一般是監控某任務Job執行情況,可以去Yarn UI或者Flink UI觀察,一般會出現如下狀況:

  • 發現某subTask執行時間過慢
  • 傳輸資料量和其他task相差過大
  • BackPressure頁面出現反壓問題(紅色High標識)

結合以上的排查定位到具體的task中執行的運算元,一般常見於Keyed型別運算元:比如groupBy()、rebance()等產生shuffle過程的操作。

2)資料傾斜的處理方法

  • 資料拆分。如果能定位的資料傾斜的key,總結其規律特徵。比如發現包含某字元,則可以在程式碼中把該部分資料key拆分出來,單獨處理後拼接。
  • key二次聚合。兩次聚合,第一次將key加字首聚合,分散單點壓力;隨後去除字首後再次聚合,得到最終結果。
  • 調整引數。加大TaskManager記憶體、keyby均衡等引數,一般效果不是很好。
  • 自定義分割槽或聚合邏輯。繼承分區劃分、聚合計算介面,根據資料特徵和自定義邏輯,調整資料分割槽並均勻打散資料key。

19 Flink資料重複怎麼辦

一般來說Flink可以開啟exactly-once機制,可保證精準一次消費。但是如果存在資料處理過程異常導致資料重複,可以藉助一些工具或者程式來處理。

建議資料量不大的話可以使用flink自身的state或者藉助bitmap結構;稍微大點可以用布隆過濾器或hyperlog工具;其次使用外部介質(redis或hbase)設計好key就行自動去重,只不過會增加處理過程。

總結一下Flink的去重方式:

  • 記憶體去重。採用Hashset等資料結構,讀取資料中類似主鍵等唯一性標識欄位,在記憶體中儲存並進行去重判斷。
  • 使用Redis Key去重。藉助Redis的Hset等特殊資料型別,自動完成Key去重。
  • DataFrame/SQL場景,使用group byover()window開窗等SQL函式去重
  • 利用groupByKey等聚合運算元去重

20 聊聊公司的Flink實時數倉架構,為什麼這麼設計

實時數倉資料規整為層級儲存,每層獨立加工。整體遵循由下向上建設思想,最大化資料賦能。

1)數倉分層設計

  • 資料來源: 分為日誌資料業務資料兩大類,包括結構化和非結構化資料。

  • 數倉型別:根據及時性分為離線數倉和實時數倉

  • 技術棧:

    • 採集(Sqoop、Flume、CDC)
    • 儲存(Hive、Hbase、Mysql、Kafka、資料湖)
    • 加工(Hive、Spark、Flink)
    • OLAP查詢(Kylin、Clickhous、ES、Dorisdb)等。

2)數倉架構設計

整體採用Lambda架構。保留實時、離線兩條處理流程,即最終會同時構建實時數倉和離線數倉。

1. 技術實現

  • 使用Flink和Kafka、Hive為主要技術棧
  • 實時技術流程。通過實時採集程式同步資料到Kafka訊息佇列
  • Flink實時讀取Kafka資料,回寫到kafka ods貼源層topic
  • Flink實時讀取Kafka的ods層資料,進行實時清洗和加工,結果寫入到kafka dwd明細層topic
  • 同樣的步驟,Flink讀取dwd層資料寫入到kafka dws彙總層topic
  • 離線技術流程和前面章節一致
  • 實時olap引擎查詢分析、報表展示

2. 優缺點

  • 兩套技術流程,全面保障實時性和歷史資料完整性
  • 同時維護兩套技術架構,維護成本高,技術難度大
  • 相同資料來源處理兩次且儲存兩次,產生大量資料冗餘和操作重複
  • 容易產生資料不一致問題

3)資料流程設計

整體從上而下,資料經過採集 -> 數倉明細加工彙總 -> 應用步驟,提供實時數倉服務。

這裡列舉使用者分析的資料流程和技術路線:

採集使用者行為資料,統計使用者曝光點選資訊,構建使用者畫像。

電商實時數倉使用者分析資料流程

電商實時數倉使用者分析資料流程

》》更多好文,請關注gzh:大資料兵工廠