兩萬字Flink筆記

語言: CN / TW / HK

Flink 基礎

Flink特性

流式計算是大資料計算的痛點,第1代實時計算引擎Storm對Exactly Once 語義和視窗支援較弱,使用的場景有限且無法支援高吞吐計算;Spark Streaming 採用“微批處理”模擬流計算,在視窗設定很小的場景中有效能瓶頸,Spark 本身也在嘗試連續執行模式(Continuous Processing),但進展緩慢。

Flink是一個低延遲、高吞吐的實時計算引擎,其利用分散式一致性快照實現檢查點容錯機制,並實現了更好的狀態管理,Flink可在毫秒級的延遲下處理上億次/秒的訊息或者事件,同時提供了一個Exactly-once的一致性語義,保證了資料的正確性,使得Flink可以提供金融級的資料處理能力,總結其高階特性包括CSTW(CheckPoint,Statue,Time,windows)

Flink和Spark對比

設計思路

Spark的技術理念是基於批來模擬流,微批處理的延時較高(無法優化到秒以下的數量級),且無法支援基於event_time的時間視窗做聚合邏輯。Flink和spark相反,它基於流計算來模擬批計算,更切合資料的生成方式,技術上有更好的擴充套件性。

狀態管理

流處理任務要對資料進行統計,如Sum, Count, Min, Max,這些值是需要儲存的,因為要不斷更新,這些值或者變數就可以理解為一種狀態,如果資料來源是在讀取Kafka, RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。

Flink提供了內建的狀態管理,可以把這些狀態儲存在Flink內部,而不需要把它儲存在外部系統,這樣做的好處:

① 降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;

② 對效能帶來了極大的提升:如果通過外部去訪問如Redis , HBase 需要網路及RPC資源,如果通過Flink內部去訪問,只通過自身的程序去訪問這些變數。

同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint儲存到一個分散式的持久化系統中,比如HDFS,這樣當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續執行它的流處理,對使用者沒有任何資料上的影響。

Flink 初探

設計架構

Flink是一個分層的架構系統,每一層所包含的元件都提供了特定的抽象,用來服務於上層元件,Flink的分層體現有四層,分別是Deploy層、core層、API層/Libraries層,其中Deploy層主要涉及的是Flink的部署模式及同資源排程元件的互動模式,Core層提供了支援Flink計算的全部核心實現,API層/Libraries層提供了Flink的API介面和基於API介面的特定應用的計算框架;

Deploy層: 該層主要涉及了Flink的部署模式,Flink支援多種部署模式:本地、叢集(Standalone/YARN)、雲(GCE/EC2),Standalone 部署模式與Spark類似;

Runtime層: Runtime層提供了支援Flink計算的全部核心實現,比如:支援分散式Stream處理、Job Graph到Execution Graph的對映、排程 等,為上層API層提供基礎服務。

API層: API層主要實現了面向無界Stream的流處理和麵向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。

Libraries層: 該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實時計算框架,也分別對應於面向流處理 和麵向批處理兩類。面向流處理支援:CEP(複雜事件處理)、SQL-like的操作(基於Table的關係操作);面向批處理支援:FlinkML(機器學習庫)、Gelly(圖處理)。

Flink on yarn

Flink支援增量迭代,具有對迭代自行優化的功能,因此在on yarn上提交的任務效能略好於 Spark,Flink提供2種方式在yarn上提交任務:啟動1個一直執行的 Yarn session(分離模式)和在 Yarn 上執行1個 Flink 任務(客戶端模式);

分離模式: 通過命令yarn-session.sh的啟動方式本質上是在yarn叢集上啟動一個flink叢集,由yarn預先給flink叢集分配若干個container,在yarn的介面上只能看到一個Flink session with X TaskManagers的任務,並且只有一個Flink介面,可以從Yarn的Application Master連結進入;

客戶端模式: 通過命令bin/flink run -m yarn-cluster啟動,每次釋出1個任務,本質上給每個Flink任務啟動了1個叢集,yarn在任務釋出時啟動JobManager(對應Yarn的AM)和TaskManager,如果一個任務指定了n個TaksManager(-yn n),則會啟動n+1個Container,其中一個是JobManager,釋出m個應用,則有m個Flink介面,不同的任務不可能在一個Container(JVM)中,實現了資源隔離。

進入Flink的bin目錄下執行./yarn-session.sh –help 檢視幫助驗證yarn是否成功配置,使用./yarn-session.sh –q 顯示yarn所有nodeManager節點資源;部署On yarn模式的Flink只需要修改配置conf/flink-conf.yaml ,詳細引數請參考官網:通用配置:Configuration,HA配置:High Availability (HA)

採用分離模式來啟動Flink Yarn Session,提交後提示該yarn application成功提交到yarn並返回id,使用yarn application –kill application_id 來停止yarn上提交的任務;

yarn-session.sh -n 3 -jm 700 -tm 700 -s 8 -nm FlinkOnYarnSession -d –st

可以直接提交自帶的詞頻統計用例,驗證on yarn模式是否配置成功:

~/bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048 ~/flink/examples/batch/WordCount.jar

流程分析

分離模式: 通過命令yarn-session.sh先啟動叢集,然後再提交作業,接著會向yarn申請一塊空間後,資源永遠保持不變。如果資源滿了,下一個作業就無法提交,只能等到yarn中的其中一個作業執行完成後,釋放了資源,下個作業才會正常提交。所有作業共享Dispatcher和ResourceManager;共享資源;適合規模小執行時間短的作業。

客戶端模式:

通過命令bin/flink run -m yarn-cluster提交任務,每提交一個作業會根據自身的情況,都會單獨向yarn申請資源,直到作業執行完成,一個作業的失敗與否並不會影響下一個作業的正常提交和執行,適合規模大長時間執行的作業;

DataStream

DataStream是Flink的較低階API,用於進行資料的實時處理任務,可以將該程式設計模型分為 DataSource、Transformation、Sink 三個部分;

DataSource

源是程式讀取輸入資料的位置,可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 將源新增到程式,Flink 有許多預先實現的源函式,也可以通過實現 SourceFunction 方法自定義非並行源 ,或通過實現 ParallelSourceFunction 或擴充套件 RichParallelSourceFunction 自定義並行源。

有幾個預定義的流資料來源可從 StreamExecutionEnvironment 訪問:

基於檔案:

readTextFile(path) #逐行讀取文字檔案(檔案符合 TextInputFormat 格式),並作為字串返回每一行。
 
readFile(fileInputFormat, path) #按指定的檔案輸入格式(fileInputFormat)讀取指定路徑的檔案。
 
readFile(fileInputFormat, path, watchType, interval, pathFilter) #前兩個方法的內部呼叫方法。根據給定檔案格式(fileInputFormat)讀取指定路徑的檔案。根據 watchType,定期監聽路徑下的新資料(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理當前在路徑中的資料並退出(FileProcessingMode.PROCESS_ONCE),使用 pathFilter,可以進一步排除正在處理的檔案。

基於Socket:socketTextStream 從 Socket 讀取,元素可以用分隔符分隔。

基於集合:

fromCollection(Seq) #用 Java.util.Collection 物件建立資料流,集合中的所有元素必須屬於同一型別;
 
fromCollection(Iterator) #用迭代器建立資料流。指定迭代器返回的元素的資料型別;
 
fromElements(elements: _*) #從給定的物件序列建立資料流。所有物件必須屬於同一型別;
 
fromParallelCollection(SplittableIterator) #並行地從迭代器建立資料流。指定迭代器返回的元素的資料型別;
 
generateSequence(from, to) #並行生成給定間隔的數字序列。

自定義: addSource 附加新的源函式。例如從 Apache Kafka 中讀取,可以使用 addSource(new FlinkKafkaConsumer08<>(...))。請詳細檢視 聯結器。

Transformation

Transformation操作將1個或多個DataStream轉換為新的DataStream,多個轉換組合成複雜的資料流拓撲,如下圖所示,DataStream會由不同的Transformation操作、轉換、過濾、聚合成其他不同的流,從而完成業務要求;

Map: DataStream -> DataStream,一個數據元生成一個新的資料元。將輸入流的元素翻倍:dataStream.map { x => x * 2 }

FlatMap:DataStream -> DataStream,一個數據元生成多個數據元(可以為0)。將句子分割為單詞:

dataStream.flatMap { str => str.split(" ") }

Filter: DataStream -> DataStream,每個資料元執行布林函式,只儲存函式返回 true 的資料元。過濾掉零值的過濾器:

dataStream.filter { _ != 0 }

KeyBy : DataStream -> KeyedStream,將流劃分為不相交的分割槽。具有相同 Keys 的所有記錄在同一分割槽。指定 key 的取值:

dataStream.keyBy("someKey") // Key by field "someKey"
 
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce : KeyedStream -> DataStream,KeyedStream 元素滾動執行 Reduce。將當前資料元與最新的一個 Reduce 值組合作為新值傳送。建立 key 的值求和:keyedStream.reduce { _ + _ }

Aggregations : KeyedStream -> DataStream,應用於 KeyedStream 上的滾動聚合。

Window: KeyedStream -> WindowedStream,Windows 可以在已經分割槽的 KeyedStream 上定義。Windows 根據某些特徵(例如,在最近5秒內到達的資料)對每個Keys中的資料進行分組。更多說明參考 Windows 或 譯版。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))

WindowAll : DataStream -> AllWindowedStream,Windows 也可以在 DataStream 上定義。在許多情況下,這是非並行轉換。所有記錄將收集在 windowAll 運算元的一個任務中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

Window Apply : WindowedStream -> DataStream 或 AllWindowedStream -> DataStream,將函式應用於整個視窗。一個對視窗資料求和:

windowedStream.apply { WindowFunction }
 
allWindowedStream.apply { AllWindowFunction }

Window Reduce: WindowedStream -> DataStream,Reduce 函式應用於視窗並返回結果值。windowedStream.reduce { _ + _ }

Aggregations on windows: WindowedStream -> DataStream,聚合視窗內容;

Union : DataStream* -> DataStream,兩個或多個數據流的合併,建立包含來自所有流的所有資料元的新流。如果將資料流與自身聯合,則會在結果流中獲取兩次資料元。

dataStream.union(otherStream1, otherStream2, ...)

Window Join : DataStream,DataStream -> DataStream,Join 連線兩個流,指定 Key 和視窗。

dataStream.join(otherStream)
 
    .where(<key selector>).equalTo(<key selector>)
 
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
 
    .apply { ... }

Window CoGroup : DataStream,DataStream -> DataStream,CoGroup 連線兩個流,指定 Key 和視窗。

dataStream.coGroup(otherStream)
 
    .where(0).equalTo(1)
 
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
 
    .apply {}

CoGroup 與 Join 的區別: CoGroup 會輸出未匹配的資料,Join 只輸出匹配的資料

Connect : DataStream,DataStream -> ConnectedStreams,連線兩個有各自型別的資料流。允許兩個流之間的狀態共享。

someStream : DataStream[Int] = ...
 
otherStream : DataStream[String] = ...
 
val connectedStreams = someStream.connect(otherStream)

可用於資料流關聯配置流;

CoMap, CoFlatMap : ConnectedStreams -> DataStream,作用域連線資料流(connected data stream)上的 map 和 flatMap:

Split : DataStream -> SplitStream,將資料流拆分為兩個或更多個流。

Select : SplitStream -> DataStream,從 SpliteStream 中選擇一個流或多個流。

val even = split select "even"
 
val odd = split select "odd"
 
val all = split.select("even","odd")

Iterate : DataStream -> IterativeStream -> DataStream,將一個運算元的輸出重定向到某個先前的運算元,在流中建立 feedback 迴圈。這對於定義不斷更新模型的演算法特別有用。以下程式碼以流開頭並連續應用迭代體。大於0的資料元將被髮送回 feedback,其餘資料元將向下遊轉發。

Project: DataStream -> DataStream,作用於元組的轉換,從元組中選擇欄位的子集。

DataStream<Tuple3<Integer, Double, String>> in = // [...]
 
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Sink

Data Sink 消費 DataStream 並轉發到檔案,套接字,外部系統或列印到頁面。Flink 帶有各種內建輸出格式,封裝在 DataStreams 上的運算元操作後面:

writeAsText() / TextOutputFormat:按字串順序寫入檔案。通過呼叫每個元素的 toString() 方法獲得字串。

writeAsCsv(...) / CsvOutputFormat:將元組寫為逗號分隔的形式寫入檔案。行和欄位分隔符是可配置的。每個欄位的值來自物件的 toString() 方法。

print() / printToErr():在標準輸出/標準錯誤流上列印每個元素的 toString() 值。可以定義輸出字首,這有助於區分不同的列印呼叫。如果並行度大於1,輸出也包含生成輸出的任務的識別符號。

writeUsingOutputFormat() / FileOutputFormat:自定義檔案輸出的方法和基類。支援自定義物件到位元組的轉換。

writeToSocket:將元素寫入 Socket,使用 SerializationSchema 進行序列化。

addSink:呼叫自定義接收器函式。請詳細檢視 聯結器。

DataStream 的 write*() 方法主要用於除錯目的。他們沒有參與 Flink checkpoint,這意味著這些函式通常具有至少一次的語義。重新整理到目標系統的資料取決於 OutputFormat 的實現,並非所有傳送到 OutputFormat 的資料都會立即顯示在目標系統中。此外,在失敗的情況下,這些記錄可能會丟失。

要將流可靠、準確地傳送到檔案系統,請使用 flink-connector-filesystem。通過 .addSink(...) 方法的自定義實現,可以實現在 checkpoint 中精確一次的語義。

Time

流式資料處理最大的特點是資料具有時間屬性特徵,Flink根據時間產生的位置不同,將時間區分為三種概念:資料生成時間(Event_time)、事件接入時間(Ingestion_time)、事件處理時間(Processing_time),使用者可以根據需要選擇事件型別作為流式資料的時間屬性,極大增強了資料處理的靈活性和準確性;

Event_time:獨立事件在產生它的裝置上的發生時間,這個時間通常在到達Flink之前已經嵌入到生產資料中,因此時間順序取決於事件產生的地方,和下游的資料處理系統的事件無關,需要在Flink中指定事件的時間屬性或者設定時間提取器提取事件時間;

Processing_time:指在操作運算元計算過程中獲取到的所在主機的時間,使用者選擇了Processing_time後,所有和時間相關的計算運算元都直接使用其所在主機的系統時間,使用Processing_time的程式效能相對較高,延時相對較低,因為其所有操作不需要做任何時間上的對比和協調;

Ingestion_time:指資料接入Flink系統的時間,依賴於Source Operator所在主機的系統時鐘;

一般場景中選擇event_time作為事件時間戳是最貼近生產的,但大多數情況下由於資料的延遲和亂序使用processing_time;

Window視窗

Windows定義和分類

在流式計算中,資料持續不斷的流入計算引擎,需要一個視窗限定計算範圍,比如監控場景的近2分鐘或者精準計算的每隔2分鐘計算一次,視窗定義了該範圍,輔助完成有界範圍的資料處理;

Flink的DataStream API將視窗抽象成獨立的Operator,且支援很多視窗運算元,每個視窗運算元包含Window Assigner 、Windows Function、觸發器、剔除器、時延設定等部分屬性,其中Window Assigner 和 Windows Function是必須要指定的屬性;

Window Assigner用來決定某個元素被分配到哪個/哪些視窗中去;Trigger觸發器決定了一個視窗何時能夠被計算或清除,每個視窗都會擁有一個自己的Trigger;

Evictor驅逐者在Trigger觸發之後,在視窗被處理之前,Evictor(如果有Evictor的話)會用來剔除視窗中不需要的元素,相當於一個filter。

Flink支援多種視窗型別,按照驅動型別分為:時間驅動的Time Window(如每30秒鐘)和資料驅動的Count Window(如每100個事件),按照視窗的滾動方式又可以分成:翻滾視窗(Tumbling Window,無重疊),滾動視窗(Sliding Window,有重疊)和會話視窗(Session Window,活動間隙),下圖可以看出分類區別:

Time Window 是根據時間對資料流進行分組的,且視窗機制和時間型別是完全解耦的,也就是說當需要改變時間型別時(三種時間)不需要更改視窗邏輯相關的程式碼,Time Window 中常見的即為Tumbling Time Window和Sliding Time Window;

Count Window 是根據元素個數對資料流進行分組的,也包括Tumbling Count Window和Sliding Count Window;

Windows實現

上圖中的元件都位於一個運算元(window operator)中,資料流源源不斷地進入運算元,每一個到達的元素都會被交給 WindowAssigner,WindowAssigner 會決定元素被放到哪個或哪些視窗(window),Window本身是一個ID識別符號,其內部可能儲存了一些元資料,如TimeWindow中有開始和結束時間,但是並不會儲存視窗中的元素。視窗中的元素實際儲存在 Key/Value State 中,key為Window,value為元素集合(或聚合值)。為了保證視窗的容錯性,該實現依賴了 Flink 的 State 機制。

每一個視窗都擁有一個屬於自己的 Trigger,Trigger上會有定時器,用來決定一個視窗何時能夠被計算或清除,每當有元素加入到該視窗,或者之前註冊的定時器超時了,那麼Trigger都會被呼叫。Trigger的返回結果可以是 continue(不做任何操作),fire(處理視窗資料),purge(移除視窗和視窗中的資料),或者 fire + purge。一個Trigger的呼叫結果只是fire的話,那麼會計算視窗並保留視窗原樣,也就是說視窗中的資料仍然保留不變,等待下次Trigger fire的時候再次執行計算。一個視窗可以被重複計算多次知道它被 purge 了。在purge之前,視窗會一直佔用著記憶體。

當Trigger fire了,視窗中的元素集合就會交給Evictor(如果指定了的話)。Evictor 主要用來遍歷視窗中的元素列表,並決定最先進入視窗的多少個元素需要被移除。剩餘的元素會交給使用者指定的函式進行視窗的計算。如果沒有 Evictor 的話,視窗中的所有元素會一起交給函式進行計算。

計算函式收到了視窗的元素(可能經過了 Evictor 的過濾),並計算出視窗的結果值,併發送給下游。視窗的結果值可以是一個也可以是多個。DataStream API 上可以接收不同型別的計算函式,包括預定義的sum(),min(),max(),還有 ReduceFunction,FoldFunction,還有WindowFunction。WindowFunction 是最通用的計算函式,其他的預定義的函式基本都是基於該函式實現的。

Flink 對於一些聚合類的視窗計算(如sum,min)做了優化,因為聚合類的計算不需要將視窗中的所有資料都儲存下來,只需要儲存一個result值就可以了。每個進入視窗的元素都會執行一次聚合函式並修改result值。這樣可以大大降低記憶體的消耗並提升效能。但是如果使用者定義了 Evictor,則不會啟用對聚合視窗的優化,因為 Evictor 需要遍歷視窗中的所有元素,必須要將視窗中所有元素都存下來。

Windows Function

在運用視窗計算時,Flink根據上有資料集是否是KeyedStream型別(資料是否按照Key分割槽),如果上游資料未分組則呼叫window()方法指定Windows Assigner,資料會根據Key在不同Task例項中平行計算,最後得出針對每個Key的統計結果,如果是Non-Keyed型別則呼叫WindowsAll()方法指定Windows Assigner,所有的資料都會在視窗運算元中路由得到一個Task中計算,並得到全域性統計結果;

定義完視窗分配器後,需要為每一個視窗指定計算邏輯,也就是Windows Function,Flink提供了四種類型Window Function,分別是ReduceFunction、AggreateFunction、FoldFunction、ProcessWindowFunction,其中FoldFunction將逐漸不再使用;四種類型有分為增量聚合操作(ReduceFunction、AggreateFunction、FoldFunction)和全量聚合操作(ProcessWindowFunction);

增量聚合函式計算效能高,佔用儲存空間少,因為其只需要維護視窗的中間結果狀態值,不需要快取原始資料;全量聚合函式使用代價相對高,效能較弱,因為運算元需要快取該視窗的接入資料,然後等視窗觸發後對所有原始資料進行彙總計算,若接入資料量大或視窗時間長容易導致計算效能下降;

ReduceFunction和AggreateFunction相似,但前者的輸出型別和輸入型別一致(如使用tuple的某個欄位聚合),後者更加靈活地提供3個複寫方法,add()定義資料的新增邏輯,getResult()定義根據Accumulator計算結果的邏輯,merge()方法定義合併accumulator的邏輯;

ProcessWindowFunction可以支撐更復雜的運算元,其支援基於視窗全部資料元素的結果計算,當運算元需要視窗的元資料或狀態資料,或者運算元不支援運算交換律和結合律(統計所有元素的中位數和眾數),需要該函式中的Context物件,Context類定義了Window的元資料及可以操作的Window的狀態資料包括GlobalState和WindowState;

大部分情況下,需要增量計算和全量計算結合,因為增量計算雖然一定程度能夠提升視窗效能,但靈活性不及ProcessWindowFunction,兩者整合使用,既可以得到增量運算元又可以得到視窗的元資料(視窗開始、終止時間等),比如在計算TOP N的場景中,分視窗計算完資料的計算後需要根據商品ID匯聚總的點選數;

Watermark

由於網路或系統等外部因素影響,事件資料不能及時傳輸到Flink系統中,導致資料亂序、延遲等問題,因此需要一種機制能夠控制資料處理的過程和進度;基於event_time時間的Windows建立後,具體如何確定屬於該Windows中的資料元素已經全部到達,如果確定全部到達就可以對所有資料進行視窗計算操作(彙總、分組),如果資料沒有全部到達,則繼續等待該視窗中的資料,但是又不能無限期的等下去,需要有機制來保證一個特定的時間後,必須觸發window去進行計算了,此時watermark發揮作用了,它表示當達到watermark後,在watermark之前的資料已經全部達到(即使後面還有延遲的資料);Watermark是處理EventTime 視窗計算提出的機制,本質上是一種時間戳,可以在讀取 Source時候指定或者在transformation操作之前,用自定義的Watermark生成器按照需求指定;

正常情況下,流式資料的到達時間是有序的,如下圖:

一般情況存在資料的亂序(out-of-order)和延遲(late element),此時水位線機制能表明該時間戳之前到當前水位線時間戳的資料已經全部達到,沒有比它(水位線)更早的資料了,並觸發計算;

Flink中生成水位線的方式有兩種:Periodic Watermarks(週期性)和Punctuated Watermarks,前者假設當前時間戳減去固定時間,所有資料都能達到,後者要在特定事件指示後觸發生成水位線;

舉例說明Periodic Watermarks 工作方式:當前window為10s,設想理想情況下訊息都沒有延遲,那麼eventTime等於系統當前時間,假如設定watermark等於eventTime的時候,當watermark = 00:00:10的時候,就會觸發w1的計算,這個時後因為訊息都沒有延遲,watermark之前的訊息(00:00:00~00:00:10)都已經落入到window中,所以會計算window中全量的資料。那麼假如有一條訊息eventTime是00:00:01 應該屬於w1,在00:00:11才到達,因為假設訊息沒有延遲,那麼watermark等於當前時間,00:00:11,這個時候w1已經計算完畢,那麼這條訊息就會被丟棄,沒有加入計算,這樣就會出現問題。這是已經可以理解,程式碼中為什麼要減去一個常量作為watermark,假設每次提取eventTime的時減去2s,那麼當data1在00:00:11到達的時候,watermark是00:00:09這個時候,w1還沒有觸發計算,那麼data1會被加入w1,這個時候計算完全沒有問題,所以減去一個常量是為了對延時的訊息進行容錯;

Punctuated Watermarks提供自定義條件生成水位,例如判斷某個資料元素的當前狀態或tuple型別的某個值,如果接入事件中狀態為0則觸發生成watermark,如果狀態不為0則不觸發,需要分別複寫extractTimestamp和checkAndGetNextWatermark方法;

Flink允許提前預定義資料的提取器Timestamp Extractors,在讀取source時候定義提取時間戳;

延遲資料

基於Event_time的視窗計算雖然可以使用warterMark機制容忍部分延遲,但只能一定程度的緩解該問題,無法應對某些延遲特別嚴重的場景。Flink預設丟失延遲資料,但使用者可以自定義延遲資料的處理方式,此時需要Allowed Lateness機制近資料的額外處理;

DataStream API提供Allowed Lateness方法指定是否對遲到資料進行處理,引數是Time型別的時間間隔大小,代表允許的最大延遲時間,Flink的視窗計算中會將Window的Endtime加上該時間作為視窗最後釋放的結束時間(P),當接入的資料中Event time未超過該時間(P),但WaterMark已經超過Window的Event_Time時直接觸發視窗計算,若Event_Time超過了時間P,則做丟棄處理;

通常情況下可以使用sideOutputLateData 方法對遲到資料進行標記,然後使用getSideOutput()方法得到被標記的延遲資料,分析延遲原因;

多流合併/關聯

合併

Connect:Flink 提供connect方法實現兩個流或多個流的合併,合併後生成ConnectedStreams,會對兩個流的資料應用不同的處理方法,並且雙流之間可以共享狀態(比如計數);ConnectedStream提供的map()和flatMap()需要定義CoMapFunction和CoFlatMapFunction分別處理輸入的DataStream資料集;

Union:Union運算元主要實現兩個或者多個輸入流合併成一個數據集,需要保證兩個流的格式一致,輸出的流與輸入完全一致;

關聯

Flink支援視窗的多流關聯,即在一個視窗上按照相同條件對多個輸入流進行join操作,需要保證輸入的Stream構建在相同的Windows上,且有相同型別的Key做為關聯條件;

資料集inputStream1通過join方法形成JoinedStreams型別資料集,呼叫where()方法指定inputStream1資料集的key,呼叫equalTo()方法指定inputStream2對應關聯的key,通過window()方法指定Window Assigner,最後通過apply()方法中傳入使用者自定義的JoinFunction或者FlatJoinFunction對輸入資料元素進行視窗計算;

Windows Join過程中所有的Join操作都是Inner Join型別,也就是必須滿足相同視窗中,每個Stream都有Key,且key相同才能完成關聯操作並輸出結果;

狀態和容錯

有狀態計算是Flink重要特性,其內部儲存計算產生的中間結果並提供給後續的Function或運算元使用,狀態資料維繫在本地儲存中,可以是Flink的堆記憶體或者堆外記憶體中,也可以藉助於第三方的儲存介質,同storm+ redis / hbase模式相比,Flink完善的狀態管理減少了對外部系統的依賴,減少維護成本;

State和型別

Flink根據資料集是否根據key分割槽將狀態分為Keyed State和 Operator State兩種型別,Keyed State只能用於KeyedStream型別資料集對應的Function和Operation上,它是Operator State的特例;

Operator State只和並行的運算元例項繫結,和資料元素中的key無關,支援當運算元例項並行度發生變化後自動重新分配狀態資料;

Keyed State和 Operator State均有兩種形式,一種是託管狀態,一種是原始狀態,前者有Flink Runtime控制和管理狀態資料並將狀態資料轉換成記憶體Hash tables 或RocksDB的物件儲存,後者由運算元自己管理資料結構,當觸發CheckPoint後,Flink並不知道狀態資料內部的資料結構,只是將資料轉換成bytes資料儲存在CheckPoint中,當從Checkpoint恢復任務時,運算元自己反序列化出狀態的資料結構;

CheckPoint 和SavePoint

Flink基於輕量級分散式快照演算法提供了CheckPoint機制,分散式快照可以將同一時間點的Task/Operator狀態資料全域性統一快照處理,包括Keyed State和Operator State

Savepoints是檢查點的一種特殊實現,底層使用CheckPoint機制,Savepoint是使用者以手工命令方式觸發CheckPoint,並將結果持久化到指定的儲存路徑中,其主要目的是幫助使用者在升級和維護叢集過程中儲存系統的狀態資料,避免因停機運維或者升級到知道正常終止的應用資料狀態無法恢復。

參考內容:

https://www.cnblogs.com/leesf456/p/11136344.html https://blog.csdn.net/a_drjiaoda/article/details/89357916 https://www.jianshu.com/p/9e92cefa9d4e http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/

- END -