Flink Watermark 機制及總結
作者 :黃龍,騰訊 CSIG 高階工程師
Flink Watermark
前言
Flink 水印機制,簡而言之,就是在 Flink 使用 Event Time 的情況下,視窗處理事件亂序和事件延遲的一種設計方案。本文從基本的概念入手,來看下 Flink 水印機制的原理和使用方式。
Flink 在流應⽤程式中三種 Time 概念
Time 型別 | 備註 |
---|---|
Processing Time | 事件被機器處理的系統時間,提供最好的效能和最低的延遲。分支式非同步環境下,容易受到事件到達系統的速度,事件在系統內操作流動速度以及中斷的影響。 |
Event Time | 一般指資料本身攜帶的時間戳,能夠滿足在特定場景下資料準確性的需求。一般而言與 Processing Time 有時間延遲,需要引入水印機制處理事件亂序和時間亂序問題。 |
Ingestion Time | 事件進入 Flink 的時間。一般在 Flink Source 定義,提供給下游視窗計算的觸發計算。 |
⼀般來說,在⽣產環境中 Event Time 與 Processing Time 是常用的策略。
Flink 的 Window
Window 是無限資料流處理的核心,Window 將一個無限長的 stream 拆分成有限大小的 buckets ,我們可以在這些 buckets 上做計算操作。
Window 的組成
Apache Flink 為使用者提供了自定義 Window 的功能。自定的 Window 主要包含的元件為 Window assigner、 evictor 和 trigger,接下來將對其進行詳細分析。
1. 視窗分配器(Window Assinger)
視窗分配器定義了資料流中的元素如何分配到視窗中,通過在分組資料流中呼叫 .window(...) 或者非分組資料流中呼叫 .windowAll(...) 時指定視窗分配器(WindowAssigner)來實現。WindowAssigner 負責將每一個到來的元素分配給一個或者多個視窗(window), Flink 提供了一些常用的預定義的視窗分配器,即:滾動視窗、滑動視窗、會話視窗和全域性視窗。你也可以通過繼承 WindowAssigner 類來自定義自己的分配器。
檢視原始碼可以看⻅ WindowAssigner 這個抽象類有如下實現類:
常用的 WindowAssigner 實現類的功能介紹如下:
Assinger | 備註 |
---|---|
GlobalWindows | 所有的資料都分配到同一個視窗。 |
MergingWindowAssigner | 可 Merge 的視窗分配處理。 |
SlidingProcessingTimeWindows | 基於 Processing Time 的滾動視窗分配處理。 |
SlidingEventTimeWindows | 基於 Event Time 的滾動視窗分配處理。 |
TumblingProcessingTimeWindows | 基於 Processing Time 的滑動視窗分配處理。 |
TumblingEventTimeWindows | 基於 Event Time 的滑動視窗分配處理。 |
ProcessingTimeSessionWindows | 基於 Processing Time 且可 merge 的會話視窗分配處理。 |
EventTimeSessionWindows | 基於 Event Time 且可 merge 會話視窗分配處理。 |
2. 觸發器(Trigger)
觸發器決定了一個視窗何時可以被視窗函式處理,每一個視窗分配器都有一個預設的觸發器,該觸發器決定合適計算和清除視窗。如果預設的觸發器不能滿足你的需要,你可以通過呼叫 trigger(...)
來指定一個自定義的觸發器。觸發器的介面有5個方法來允許觸發器處理不同的事件:
-
onElement()
方法,每個元素被新增到視窗時呼叫 -
onEventTime()
方法,當一個已註冊的事件時間計時器啟動時呼叫 -
onProcessingTime()
方法,當一個已註冊的處理時間計時器啟動時呼叫 -
onMerge()
方法,與狀態性觸發器相關,當使用會話視窗時,兩個觸發器對應的視窗合併時,合併兩個觸發器的狀態。
每個觸發動作的返回結果⽤ TriggerResult 定。TriggerResult 有四種狀態:
-
CONTINUE
:什麼也不做 -
FIRE
:觸發計算 -
PURGE
:清除視窗中的資料 -
FIRE_AND_PURGE
:觸發計算並清除視窗中的資料
檢視原始碼可以看⻅ Trigger 這個抽象類有如下實現類:
常用的 Trigger 實現類的功能介紹如下:
Trigger | 備註 |
---|---|
EventTimeTrigger | 當水印通過視窗末尾時觸發的觸發器。 |
ProcessingTimeTrigger | 當系統時間通過視窗末尾時觸發的觸發器。 |
CountTrigger | 視窗元素達到閾值觸發的觸發器。 |
PurgingTrigger | 作為引數,使其成為帶有清除功能觸發器。 |
DeltaTrigger | 基於 DeltaFunction 和一個閾值的觸發器。 |
3.視窗驅逐器(Evictor)
Flink 的視窗模型允許指定一個除了 WindowAssigner 和 Trigger 之外的可選引數 Evitor,這個可以通過呼叫 evitor(...) 方法來實現。這個驅逐器(evitor)可以在觸發器觸發之前或者之後,或者視窗函式被應用之前清理視窗中的元素。如果沒有定義 Evictor,觸發器直接將所有窗⼝元素交給計算函式。
檢視原始碼可以看⻅ Evictor 這個抽象類有如下實現類:
常用的 Evictor 實現類的功能介紹如下:
Trigger | 備註 |
---|---|
TimeEvitor | 清除時間戳小於視窗元素中的最大時間戳 - interval的元素。 |
CountEvitor | 只儲存指定數量的資料。 |
DeltaEvitor | 通過一個 DeltaFunction 和一個閾值,計算視窗快取中最近的一個元素和剩餘的所有元素的 delta 值,並清除 delta 值大於或者等於閾值的元素。 |
Event Time 使用的場景和需要解決的問題
Event Time 場景⼀般是業務需求需要時間這個欄位,⽐如購物時是要先有下單事件、再有⽀付事件;借貸事件的⻛控是需要依賴時間來做判斷的;機器異常檢測觸發的告警也是要具體的異常事件的時間展示出來;商品⼴告及時精準推薦給⽤戶依賴的就是⽤戶在瀏覽商品的時間段/頻率/時⻓等。這些場景只能根據事件時間來處理資料。
當基於事件時間的資料流進⾏窗⼝計算時,由於 Flink 接收到的事件的先後順序並不是嚴格的按照事件的 Event Time 順序排列(會因為各種各樣的問題如⽹絡的抖動、裝置的故障、應⽤的異常等) ,最為困難的⼀點也就是如何確定對應當前窗⼝的事件已經全部到達。然⽽實際上並不能百分百的準確判斷,因此業界常⽤的⽅法就是基於已經收集的訊息來估算是否還有訊息未到達,這就是 Watermark 的思想。Watermark 本質來說就是⼀個時間戳,代表著⽐這時間戳早的事件已經全部到達窗⼝,即假設不會再有⽐這時間戳還⼩的事件到達,這個假設是觸發窗⼝計算的基礎,只有 Watermark ⼤於窗⼝對應的結束時間,窗⼝才會關閉和進⾏計算。按照這個標準去處理資料,那麼如果後⾯還有⽐這時間戳更⼩的資料,那麼就視為遲到的資料,對於這部分遲到的資料處理也是一個問題。
Watermark + window 處理亂序資料
在 Flink 中,資料處理中需要通過調⽤ DataStream 中的 assignTimestampsAndWatermarks ⽅法來分配時間和⽔印,該⽅法可以傳⼊兩種引數,⼀個是 AssignerWithPeriodicWatermarks,另⼀個是 AssignerWithPunctuatedWatermarks,通常建議在資料來源(source)之後就進⾏⽣成⽔印,或者做些簡單操作⽐如 filter/map/flatMap 之 後再⽣成⽔印,越早⽣成⽔印的效果會更好,也可以直接在資料來源頭就做⽣成⽔印。
1.AssignerWithPeriodicWatermarks
資料流中每一個遞增的 Event Time 都會產生一個 Watermark在實際的⽣產環境中,在 TPS 很⾼的情況下會產⽣⼤量的 Watermark,可能在⼀定程度上會對下游算⼦造成⼀定的壓⼒,所以只有在實時性要求⾮常⾼的場景才會選擇這種⽅式來進⾏⽔印的⽣成。而且新版 Flink 原始碼中已經標記為 @Deprecated
2. AssignerWithPeriodicWatermarks
週期性的產生一個 Watermark,但是必須結合時間或者積累條數兩個維度,否則在極端情況下會有很⼤的延時,所以週期性 Watermark 的⽣成⽅式需要根據業務場景的不同進⾏不同程度的除錯,以便達到理想的效果。
檢視原始碼可以看⻅ AssignerWithPeriodicWatermarks 這個抽象類有如下主要實現類:
-
BoundedOutOfOrdernessTimestampExtractor:
該類⽤來發出滯後於資料時間的⽔印,可以傳⼊⼀個時間代表著可以允許資料延遲到來的時間是多⻓,超過延遲時間的話如果還來了之前早的資料,那麼 Flink 就會丟棄了。
-
CustomWatermarkExtractor:這是⼀個⾃定義的週期性⽣成⽔印的類,在這個類⾥⾯的資料是 KafkaEvent。
Late Element(延遲資料)的處理
延遲資料三種處理方案
1. 丟棄(預設)
2. allowedLateness 指定允許資料延遲的時間
在某些情況下,我們希望對遲到的資料再提供一個寬容的時間。Flink 提供了 allowedLateness 方法可以實現對遲到的資料設定一個延遲時間,在指定延遲時 間內到達的資料還是可以觸發 window 執行的。呼叫 .allowedLateness(Time lateness)
3. sideOutputLateData 收集遲到的資料
通過 sideOutputLateData 可以把遲到的資料統一收集,統一儲存,方便後期排查問題。該⽅法會將延遲的資料傳送到給定 OutputTag 的 side output 中去,然後你可以通過 SingleOutputStreamOperator.getSideOutput(OutputTag) 來獲取這些延遲的資料。
在多並行度下的 Watermark 應用
在多並行度下(假設流程式存在 shuffle,存在一個運算元多個輸入的情況), Watermark 會在每個並行度的 source 處或者其他運算元內部新增,水印在資料流 shuffle 的過程中的合併方式是:Watermark 會對齊會取所有 channel 最小的 Watermark。
下圖顯示了多並行度下事件水印的合併方式。
以 Kafka Source 為例,通常每個 Kafka 分割槽的資料時間戳是遞增的(事件是有序的),但是當你作業設定多個並⾏度的時候,Flink 去消費 Kafka 資料流是並⾏的,那麼並⾏的去消費 Kafka 分 區的資料就會導致打亂原每個分割槽的資料時間戳的順序。在這種情況下,你可以使⽤ Flink 中的 Kafka-partition-aware 特性來⽣成⽔印,使⽤該特性後,⽔印會在 Kafka 消費端⽣成,然後每個 Kafka 分割槽和每個分割槽上的⽔印最後的合併⽅式和⽔印在資料流 shuffle 過程中的合併⽅式⼀ 致。
下面的插圖展示瞭如何使用每個kafka分割槽的水印生成,以及在這種情況下,水印如何通過資料流傳播。
Flink SQL 之 Watermark 的使用
在建立表的 DDL 中定義
事件時間屬性可以用 WATERMARK 語句在 CREATE TABLE DDL 中進行定義。WATERMARK 語句在一個已有欄位上定義一個 Watermark 生成表示式,同時標記這個已有欄位為時間屬性欄位。
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 宣告 user_action_time 是事件時間屬性,並且用 延遲 5 秒的策略來生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
如果源中的時間戳資料表示為一個 epoch time,通常是一個長值,例如 1618989564564,建議將事件時間屬性定義為 TIMESTAMP_LTZ 列
CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
在場景和最佳實踐方面,這裡引用一下雲+ 社群 騰訊雲流計算 Oceanus 專欄文章 。這裡可以找到關於 Flink的當下熱門的應用場景和最佳實踐,而且定時更新,極具參考價值。這裡就不做過多的介紹了。
流計算 Oceanus 是大資料產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連線、亞秒延時、低廉成本、安全穩定等特點的企業級實時大資料分析平臺。流計算 Oceanus 以實現企業資料價值最大化為目標,加速企業實時化數字化的建設程序。
總結
本文從 Flink Watermark 涉及的基本的概念入手,闡述 Flink 水印機制的原理和使用方式。先後介紹了 Time 的型別,Windows 的組成,Event Time 和 Watermark 的使用場景和方式,重點是 Watermark 的設計方案如何解決視窗處理事件亂序和事件延遲的問題。拋轉引玉,希望通過本文的介紹,有更多的人瞭解和關注 FLink 相關機制和原理。希望大家多關注雲+ 社群 騰訊雲流計算 Oceanus,多多交流,相互學習,共同進步。
掃碼加入 流計算 Oceanus 產品交流群:point_down:
掃碼關注 「騰訊雲大資料」 ,瞭解騰訊雲流計算 Oceanus 更多資訊 ~
騰訊雲大資料
長按二維碼
關注我們
- 打造次世代分析型資料庫(七):向量化計算層快取
- 騰訊雲大資料ES:使用Elastic APM監控SpringBoot服務的最佳實踐
- 業內首個基於Iceberg的“雲端倉轉湖”生產實踐探索
- 從“家庭碼”到“線上會議”,融入疫情生活的大資料
- 騰訊雲ES:一站式接入,資料鏈路視覺化重磅來襲!
- 打造次世代分析型資料庫(三):列存表最佳實踐
- Elastic Stack最佳實踐系列:Beats->ES,一個更輕型的架構選擇
- Flink資源排程模型
- 最佳實踐:MySQL CDC 同步資料到 ES
- 騰訊雲ES:一站式配置,TKE容器日誌採集與分析就是這麼簡單!
- 速度提升10倍,騰訊基於Iceberg的資料治理與優化實踐
- Flink 實踐教程:入門(12):元資料的使用
- Flink Metrics&REST API 介紹和原理解析
- Flink 最佳實踐:TDSQL Connector 的使用(上)
- Flink Watermark 機制及總結
- Flink 實踐教程-進階(10):自定義聚合函式(UDAF)
- Flink 實踐教程-進階(9):自定義表值函式(UDTF)
- 資料分析小結:使用流計算 Oceanus(Flink) SQL 作業進行資料型別轉換
- Flink 實踐教程-進階(8):自定義標量函式(UDF)
- 實時數倉:基於 Flink CDC 實現 Oracle 資料實時更新到 Kudu