面試官: Flink雙流JOIN瞭解嗎? 簡單說說其實現原理
摘要:今天和大家聊聊Flink雙流Join問題。這是一個高頻面試點,也是工作中常遇到的一種真實場景。
本文分享自華為雲社群《萬字直通面試:Flink雙流JOIN》,作者:大資料兵工廠 。
如何保證Flink雙流Join準確性和及時性、除了視窗join還存在哪些實現方式、究竟如何回答才能完全打動面試官呢。。你將在本文中找到答案。
1 引子
1.1 資料庫SQL中的JOIN
我們先來看看資料庫SQL中的JOIN操作。如下所示的訂單查詢SQL,通過將訂單表的id和訂單詳情表order_id關聯,獲取所有訂單下的商品資訊。
select
a.id as '訂單id',
a.order_date as '下單時間',
a.order_amount as '訂單金額',
b.order_detail_id as '訂單詳情id',
b.goods_name as '商品名稱',
b.goods_price as '商品價格',
b.order_id as '訂單id'
from
dwd_order_info_pfd a
right join
dwd_order_detail_pfd b
on a.id = b.order_id
這是一段很簡單的SQL程式碼,就不詳細展開敘述了。此處主要引出SQL中的JOIN型別,這裡用到的是 right join , 即右連線。
- left join: 保留左表全部資料和右表關聯資料,右表非關聯資料置NULL
- right join: 保留右表全部資料和左表關聯資料,左表非關聯資料置NULL
- inner join: 保留左表關聯資料和右邊關聯資料
- cross join: 保留左表和右表資料笛卡爾積
基於關聯鍵值逐行關聯匹配,過濾表資料並生成最終結果,提供給下游資料分析使用。
就此打住,關於資料庫SQL中的JOIN原理不再多贅述,感興趣的話大家可自行研究,下面我們將目光轉移到大資料領域看看吧。
1.2 離線場景下的JOIN
假設存在這樣一個場景:
已知Mysql資料庫中訂單表和訂單明細表,且滿足一對多的關係,統計T-1天所有訂單的商品分佈詳情。
聰明的大家肯定已經給出了答案,沒錯~就是上面的SQL:
select a.*, b.*
from
dwd_order_info_pfd a
right join
dwd_order_detail_pfd b
on a.id = b.order_id
現在修改下條件:已知訂單表和訂單明細表均為億級別資料,求相同場景下的分析結果。
咋辦?此時關係型資料庫貌似不大合適了~開始放大招:使用大資料計算引擎來解決。
考慮到T-1統計場景對時效性要求很低,可以使用Hive SQL來處理,底層跑Mapreduce任務。如果想提高執行速度,換成Flink或Spark計算引擎,使用記憶體計算。
至於查詢SQL和上面一樣,並將其封裝成一個定時排程任務, 等系統排程執行。如果結果不正確的話,由於資料來源和資料靜態不變,大不了重跑,看起來感覺皆大歡喜~
可是好景不長,產品冤家此時又給了你一個無法拒絕的需求:我要實時統計!!
2 實時場景下的JOIN
還是上面的場景,此時資料來源換成了實時訂單流和實時訂單明細流,比如Kafka的兩個topic,要求實時統計每分鐘內所有訂單下的商品分佈詳情。
現在情況貌似變得複雜了起來,簡單分析下:
- 資料來源。實時資料流,和靜態流不同,資料是實時流入的且動態變化,需要計算程式支援實時處理機制。
- 關聯性。前面提到靜態資料執行多次join操作,左表和右表能關聯的資料是很恆定的;而實時資料流(左右表)如果進入時機不一致,原本可以關聯的資料會關聯不上或者發生錯誤。
- 延遲性。實時統計,提供分鐘甚至秒級別響應結果。
由於流資料join的特殊性,在滿足實時處理機制、低延遲、強關聯性的前提下,看來需要制定完善的資料方案,才能實現真正的流資料JOIN。
2.1 方案思路
我們知道訂單資料和訂單明細資料是一對多的關係,即一條訂單資料對應著多條商品明細資料,畢竟買一件商品也是那麼多郵費,不如打包團購。。而一條明細資料僅對應一條訂單資料。
這樣,雙流join策略可以考慮如下思路:
- 當資料流為訂單資料時。無條件保留,無論當前是否關聯到明細資料,均留作後續join使用。
- 當資料流為明細資料時。在關聯到其訂單資料後,就可以say goodbye了,否則暫時保留等待下一次與訂單資料的邂逅。
- 完成所有處於同一時段內的訂單資料和訂單明細資料join, 清空儲存狀態
實際生產場景中,需要考慮更多的複雜情況,包括JOIN過程的資料丟失等異常情況的處理,此處僅示意。
好了,看起來我們已經有了一個馬馬虎虎的實時流JOIN方案雛形。
貌似可以準備動手大幹一場了~ 彆著急,有人已經幫我們偷偷的實現了:Apache Flink
3 Flink的雙流JOIN
Apache Flink 是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。Flink 被設計在所有常見的叢集環境中執行,以記憶體執行速度和任意規模來執行計算。
——來自Flink官網定義
這裡我們只需要知道Flink是一個實時計算引擎就行了,主要關注其如何實現雙流JOIN。
3.1 內部執行機制
- 記憶體計算:Flink任務優先在記憶體中計算,記憶體不夠時儲存到訪問高效的磁碟,提供秒級延遲響應。
- 狀態強一致性:Flink使用一致性快照儲存狀態,並定期檢查本地狀態、持久儲存來保證狀態一致性。
- 分散式執行: Flink應用程式可以劃分為無數個並行任務在叢集中執行,幾乎無限量使用CPU、主記憶體、磁碟和網路IO。
- 內建高階程式設計模型:Flink程式設計模型抽象為SQL、Table、DataStream|DataSet API、Process四層,並封裝成豐富功能的運算元,其中就包含JOIN型別的運算元。
仔細看看,我們前面章節討論的實時流JOIN方案的前提是否都滿足了呢?
- 實時處理機制: Flink天生即實時計算引擎
- 低延遲: Flink記憶體計算秒級延遲
- 強關聯性: Flink狀態一致性和join類運算元
不由感嘆, 這個Flink果然強啊~
保持好奇心,我們去瞅瞅Flink雙流join的真正奧義!!
3.2 JOIN實現機制
Flink雙流JOIN主要分為兩大類。一類是基於原生State的Connect運算元操作,另一類是基於視窗的JOIN操作。其中基於視窗的JOIN可細分為window join和interval join兩種。
- 實現原理:底層原理依賴Flink的State狀態儲存,通過將資料儲存到State中進行關聯join, 最終輸出結果。
恍然大悟, Flink原來是通過State狀態來快取等待join的實時流。
這裡給大家丟擲一個問題:
用redis儲存可不可以,state儲存相比redis儲存的區別?
更多細節歡迎大家一起探討,添加個人微信: youlong525 拉您進群,還有免費Flink PDF領取~
回到正題,這幾種方式到底是如何實現雙流JOIN的?我們接著往下看。
注意: 後面內容將多以文字 + 程式碼的形式呈現,避免枯燥,我放了一堆原創示意圖~
4 基於Window Join的雙流JOIN實現機制
顧名思義,此類方式利用Flink的視窗機制實現雙流join。通俗理解,將兩條實時流中元素分配到同一個時間視窗中完成Join。
- 底層原理: 兩條實時流資料快取在Window State中,當視窗觸發計算時,執行join操作。
4.1 join運算元
先看看Window join實現方式之一的join運算元。這裡涉及到Flink中的視窗(window)概念,因此Window Joinan按照視窗型別區分的話某種程度來說可以細分出3種:
- Tumbling Window Join (滾動視窗)
- Sliding Window Join (滑動視窗)
- Session Widnow Join(會話視窗)
兩條流資料按照關聯主鍵在(滾動、滑動、會話)視窗內進行inner join, 底層基於State儲存,並支援處理時間和事件時間兩種時間特徵,看下原始碼:
原始碼核心總結:windows視窗 + state儲存 + 雙層for迴圈執行join()
現在讓我們把時間軸往回拉一點點,在實時場景JOIN那裡我們收到了這樣的需求:統計每分鐘內所有訂單下的商品明細分佈。
OK, 使用join運算元小試牛刀一下。我們定義60秒的滾動視窗,將訂單流和訂單明細流通過order_id關聯,得到如下的程式:
val env = ...
// kafka 訂單流
val orderStream = ...
// kafka 訂單明細流
val orderDetailStream = ...
orderStream.join(orderDetailStream)
.where(r => r._1) //訂單id
.equalTo(r => r._2) //訂單id
.window(TumblingProcessTimeWindows.of(
Time.seconds(60)))
.apply {(r1, r2) => r1 + " : " + r2}
.print()
整個程式碼其實很簡單,概要總結下:
- 定義兩條輸入實時流A、B
- A流呼叫join(b流)運算元
- 關聯關係定義: where為A流關聯鍵,equalTo為B流關聯鍵,都是訂單id
- 定義window視窗(60s間隔)
- apply方法定義邏輯輸出
這樣只要程式穩定執行,就能夠持續不斷的計算每分鐘內訂單分佈詳情,貌似解決問題了奧~
還是別高興太早,別忘了此時的join型別是inner join。複習一下知識: inner join指的是僅保留兩條流關聯上的資料。
這樣雙流中沒關聯上的資料豈不是都丟掉了?別擔心,Flink還提供了另一個window join操作: coGroup運算元。
4.2 coGroup運算元
coGroup運算元也是基於window視窗機制,不過coGroup運算元比Join運算元更加靈活,可以按照使用者指定的邏輯匹配左流或右流資料並輸出。
換句話說,我們通過自己指定雙流的輸出來達到left join和right join的目的。
現在來看看在相同場景下coGroup運算元是如何實現left join:
#這裡看看java運算元的寫法
orderDetailStream
.coGroup(orderStream)
.where(r -> r.getOrderId())
.equalTo(r -> r.getOrderId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {
@Override
public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector) {
for (OrderDetail orderDetaill : orderDetailRecords) {
boolean flag = false;
for (Order orderRecord : orderRecords) {
// 右流中有對應的記錄
collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));
flag = true;
}
if (!flag) {
// 右流中沒有對應的記錄
collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));
}
}
}
})
.print();
這裡需要說明幾點:
- join運算元替換為coGroup運算元
- 兩條流依然需要在一個window中且定義好關聯條件
- apply方法中自定義判斷,此處對右值進行判斷:如果有值則進行連線輸出,否則右邊置為NULL。
可以這麼說,現在我們已經徹底搞定了視窗雙流JOIN。
只要你給我提供具體的視窗大小,我就能通過join或coGroup運算元鼓搗出各種花樣join,而且使用起來特別簡單。
但是假如此時我們親愛的產品又提出了一個小小問題:
大促高峰期,商品資料某時段會寫入不及時,時間可能比訂單早也可能比訂單晚,還是要計算每分鐘內的訂單商品分佈詳情,沒問題吧~
當然有問題:兩條流如果步調不一致,還用視窗來控制能join的上才怪了~ 很容易等不到join流視窗就自動關閉了。
還好,我知道Flink提供了Interval join機制。
5 基於Interval Join的雙流JOIN實現機制
Interval Join根據右流相對左流偏移的時間區間(interval)作為關聯視窗,在偏移區間視窗中完成join操作。
有點不好理解,我畫個圖看下:
stream2.time ∈ (stream1.time +low, stream1.time +high)
滿足資料流stream2在資料流stream1的 interval(low, high)偏移區間內關聯join。interval越大,關聯上的資料就越多,超出interval的資料不再關聯。
- 實現原理:interval join也是利用Flink的state儲存資料,不過此時存在state失效機制ttl,觸發資料清理操作。
這裡再引出一個問題:
state的ttl機制需要怎麼設定?不合理的ttl設定會不會撐爆記憶體?
我會在後面的文章中深入講解下State的ttl機制,歡迎大家一起探討~
下面簡單看下interval join的程式碼實現過程:
val env = ...
// kafka 訂單流
val orderStream = ...
// kafka 訂單明細流
val orderDetailStream = ...
orderStream.keyBy(_.1)
// 呼叫intervalJoin關聯
.intervalJoin(orderDetailStream._2)
// 設定時間上限和下限
.between(Time.milliseconds(-30), Time.milliseconds(30))
.process(new ProcessWindowFunction())
class ProcessWindowFunction extends ProcessJoinFunction...{
override def processElement(...) {
collector.collect((r1, r2) => r1 + " : " + r2)
}
}
訂單流在流入程式後,等候(low,high)時間間隔內的訂單明細流資料進行join, 否則繼續處理下一個流。
從程式碼中我們發現,interval join需要在兩個KeyedStream之上操作,即keyBy(),並在between()方法中指定偏移區間的上下界。
需要注意的是interval join實現的也是inner join,且目前只支援事件時間。
6 基於Connect的雙流JOIN實現機制
前面在使用Window join或者Interval Join來實現雙流join的時候,我發現了其中的共性:
無論哪種實現方式,Flink內部都將join過程透明化,在運算元中封裝了所有的實現細節。
這是什麼?是程式語言中的抽象概念~ 隱藏底層細節,對外暴露統一API, 大幅簡化程式編碼。
可是這樣會引來一個問題:如果程式報錯或者資料異常,如何快速進行調優排查,直接看原始碼嗎?不大現實。。
這裡介紹基於Connect運算元實現的雙流JOIN方法,我們可自己控制雙流JOIN處理邏輯,同時保持過程時效性和準確性。
6.1 Connect運算元原理
對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以呼叫不同方法在兩個實時流上執行,且雙流之間可以共享狀態。
圖上我們可以看到,兩個資料流被connect之後,只是被放在了同一個流中,內部依然保持各自的資料和形式,兩個流相互獨立。
[DataStream1, DataStream2] -> ConnectedStreams[1,2]
這樣,我們可以在Connect運算元底層的ConnectedStreams中編寫程式碼,自行實現雙流JOIN的邏輯處理。
6.2 技術實現
1.呼叫connect運算元,根據orderid進行分組,並使用process運算元分別對兩條流進行處理。
orderStream.connect(orderDetailStream)
.keyBy("orderId", "orderId")
.process(new orderProcessFunc());
2.process方法內部進行狀態程式設計, 初始化訂單、訂單明細和定時器的ValueState狀態。
private ValueState<OrderEvent> orderState;
private ValueState<TxEvent> orderDetailState;
private ValueState<Long> timeState;
// 初始化狀態Value
orderState = getRuntimeContext().getState(
new ValueStateDescriptor<Order>
("order-state",Order.class));
····
3.為每個進入的資料流儲存state狀態並建立定時器。在時間視窗內另一個流達到時進行join並輸出,完成後刪除定時器。
@Override
public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){
if (orderDetailState.value() == null){
//明細資料未到,先把訂單資料放入狀態
orderState.update(value);
//建立定時器,60秒後觸發
Long ts = (value.getEventTime()+10)*1000L;
ctx.timerService().registerEventTimeTimer(
ts);
timeState.update(ts);
}else{
//明細資料已到,直接輸出到主流
out.collect(new Tuple2<>(value,orderDetailS
tate.value()));
//刪除定時器
ctx.timerService().deleteEventTimeTimer
(timeState.value());
//清空狀態,注意清空的是支付狀態
orderDetailState.clear();
timeState.clear();
}
}
...
@Override
public void processElement2(){
...
}
4.未及時達到的資料流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {
// 實現左連線
if (orderState.value() != null){
ctx.output(new OutputTag<String>("left-jo
in") {},
orderState.value().getTxId());
// 實現右連線
}else{
ctx.output(new OutputTag<String>("left-jo
in") {},
orderDetailState.value().getTxId());
}
orderState.clear();
orderDetailState.clear();
timeState.clear();
}
總體思想: 基於資料時間實現訂單資料及訂單明細資料的關聯,超時或者缺失則由側輸出流輸出。
在connect中針對訂單流和訂單明細流,先建立定時器並儲存state狀態,處於視窗內就進行join, 否則進入側輸出流。
7 雙流JOIN的優化與總結
- 為什麼我的雙流join時間到了卻不觸發,一直沒有輸出
檢查一下watermark的設定是否合理,資料時間是否遠遠大於watermark和視窗時間,導致視窗資料經常為空
- state資料儲存多久,會記憶體爆炸嗎
state自帶有ttl機制,可以設定ttl過期策略,觸發Flink清理過期state資料。建議程式中的state資料結構用完後手動clear掉。
- 我的雙流join傾斜怎麼辦
join傾斜三板斧: 過濾異常key、拆分表減少資料、打散key分佈。當然可以的話我建議加記憶體!加記憶體!加記憶體!!
- 想實現多流join怎麼辦
目前無法一次實現,可以考慮先union然後再二次處理;或者先進行connnect操作再進行join操作,僅建議~
- join過程延遲、沒關聯上的資料會丟失嗎
這個一般來說不會,join過程可以使用側輸出流儲存延遲流;如果出現節點網路等異常,Flink checkpoint也可以保證資料不丟失。
某日
面試官: Flink雙流join瞭解嗎? 簡單說說其實現原理。
某君: Flink雙流JOIN是。。。
- 手繪圖解java類載入原理
- 關於加密通道規範,你真正用的是TLS,而非SSL
- 程式碼重構,真的只有複雜化一條路嗎?
- 解讀分散式排程平臺Airflow在華為雲MRS中的實踐
- 透過例項demo帶你認識gRPC
- 帶你聚焦GaussDB(DWS)儲存時遊標使用
- 傳統到敏捷的轉型中,誰更適合做Scrum Master?
- 輕鬆解決研發知識管理難題
- Java中觀察者模式與委託,還在傻傻分不清
- 如何使用Python實現影象融合及加法運算?
- 什麼是強化學習?
- 探索開源工作流引擎Azkaban在MRS中的實踐
- GaussDB(DWS) NOT IN優化技術解密:排他分析場景400倍效能提升
- Java中觀察者模式與委託,還在傻傻分不清
- Java中的執行緒到底有哪些安全策略
- 一圖詳解java-class類檔案原理
- Java中的執行緒到底有哪些安全策略
- 擺平各類目標檢測識別AI應用,有它就夠了!
- KeyDB重量釋出6.3.0開源版,華為深度參與貢獻
- 如何使用Tomcat實現WebSocket即時通訊服務服務端