面試官: Flink雙流JOIN瞭解嗎? 簡單說說其實現原理

語言: CN / TW / HK
摘要:今天和大家聊聊Flink雙流Join問題。這是一個高頻面試點,也是工作中常遇到的一種真實場景。

本文分享自華為雲社群《萬字直通面試:Flink雙流JOIN》,作者:大資料兵工廠 。

如何保證Flink雙流Join準確性和及時性、除了視窗join還存在哪些實現方式、究竟如何回答才能完全打動面試官呢。。你將在本文中找到答案。

1 引子

1.1 資料庫SQL中的JOIN

我們先來看看資料庫SQL中的JOIN操作。如下所示的訂單查詢SQL,通過將訂單表的id和訂單詳情表order_id關聯,獲取所有訂單下的商品資訊。

select 
   a.id as '訂單id',
   a.order_date as '下單時間',
   a.order_amount as '訂單金額',
   b.order_detail_id as '訂單詳情id',
   b.goods_name as '商品名稱',
   b.goods_price as '商品價格',
   b.order_id as '訂單id'
from 
   dwd_order_info_pfd a
right join 
   dwd_order_detail_pfd b
on a.id = b.order_id

這是一段很簡單的SQL程式碼,就不詳細展開敘述了。此處主要引出SQL中的JOIN型別,這裡用到的是 right join , 即右連線。

  • left join: 保留左表全部資料和右表關聯資料,右表非關聯資料置NULL
  • right join: 保留右表全部資料和左表關聯資料,左表非關聯資料置NULL
  • inner join: 保留左表關聯資料和右邊關聯資料
  • cross join: 保留左表和右表資料笛卡爾積

基於關聯鍵值逐行關聯匹配,過濾表資料並生成最終結果,提供給下游資料分析使用。

就此打住,關於資料庫SQL中的JOIN原理不再多贅述,感興趣的話大家可自行研究,下面我們將目光轉移到大資料領域看看吧。

1.2 離線場景下的JOIN

假設存在這樣一個場景:

已知Mysql資料庫中訂單表和訂單明細表,且滿足一對多的關係,統計T-1天所有訂單的商品分佈詳情。

聰明的大家肯定已經給出了答案,沒錯~就是上面的SQL:

select a.*, b.*
from 
   dwd_order_info_pfd a
right join 
   dwd_order_detail_pfd b
on a.id = b.order_id

現在修改下條件:已知訂單表和訂單明細表均為億級別資料,求相同場景下的分析結果。

咋辦?此時關係型資料庫貌似不大合適了~開始放大招:使用大資料計算引擎來解決。

考慮到T-1統計場景對時效性要求很低,可以使用Hive SQL來處理,底層跑Mapreduce任務。如果想提高執行速度,換成Flink或Spark計算引擎,使用記憶體計算。

至於查詢SQL和上面一樣,並將其封裝成一個定時排程任務, 等系統排程執行。如果結果不正確的話,由於資料來源和資料靜態不變,大不了重跑,看起來感覺皆大歡喜~

可是好景不長,產品冤家此時又給了你一個無法拒絕的需求:我要實時統計!!

2 實時場景下的JOIN

還是上面的場景,此時資料來源換成了實時訂單流和實時訂單明細流,比如Kafka的兩個topic,要求實時統計每分鐘內所有訂單下的商品分佈詳情。

現在情況貌似變得複雜了起來,簡單分析下:

  1. 資料來源。實時資料流,和靜態流不同,資料是實時流入的且動態變化,需要計算程式支援實時處理機制。
  2. 關聯性。前面提到靜態資料執行多次join操作,左表和右表能關聯的資料是很恆定的;而實時資料流(左右表)如果進入時機不一致,原本可以關聯的資料會關聯不上或者發生錯誤。
  3. 延遲性。實時統計,提供分鐘甚至秒級別響應結果。

由於流資料join的特殊性,在滿足實時處理機制、低延遲、強關聯性的前提下,看來需要制定完善的資料方案,才能實現真正的流資料JOIN。

2.1 方案思路

我們知道訂單資料和訂單明細資料是一對多的關係,即一條訂單資料對應著多條商品明細資料,畢竟買一件商品也是那麼多郵費,不如打包團購。。而一條明細資料僅對應一條訂單資料。

這樣,雙流join策略可以考慮如下思路:

  • 當資料流為訂單資料時。無條件保留,無論當前是否關聯到明細資料,均留作後續join使用。
  • 當資料流為明細資料時。在關聯到其訂單資料後,就可以say goodbye了,否則暫時保留等待下一次與訂單資料的邂逅。
  • 完成所有處於同一時段內的訂單資料和訂單明細資料join, 清空儲存狀態

實際生產場景中,需要考慮更多的複雜情況,包括JOIN過程的資料丟失等異常情況的處理,此處僅示意。

好了,看起來我們已經有了一個馬馬虎虎的實時流JOIN方案雛形。

貌似可以準備動手大幹一場了~ 彆著急,有人已經幫我們偷偷的實現了:Apache Flink

3 Flink的雙流JOIN

Apache Flink 是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。Flink 被設計在所有常見的叢集環境中執行,以記憶體執行速度和任意規模來執行計算。
——來自Flink官網定義

這裡我們只需要知道Flink是一個實時計算引擎就行了,主要關注其如何實現雙流JOIN。

3.1 內部執行機制

  • 記憶體計算:Flink任務優先在記憶體中計算,記憶體不夠時儲存到訪問高效的磁碟,提供秒級延遲響應。
  • 狀態強一致性:Flink使用一致性快照儲存狀態,並定期檢查本地狀態、持久儲存來保證狀態一致性。
  • 分散式執行: Flink應用程式可以劃分為無數個並行任務在叢集中執行,幾乎無限量使用CPU、主記憶體、磁碟和網路IO。
  • 內建高階程式設計模型:Flink程式設計模型抽象為SQL、Table、DataStream|DataSet API、Process四層,並封裝成豐富功能的運算元,其中就包含JOIN型別的運算元。

仔細看看,我們前面章節討論的實時流JOIN方案的前提是否都滿足了呢?

  1. 實時處理機制: Flink天生即實時計算引擎
  2. 低延遲: Flink記憶體計算秒級延遲
  3. 強關聯性: Flink狀態一致性和join類運算元

不由感嘆, 這個Flink果然強啊~

保持好奇心,我們去瞅瞅Flink雙流join的真正奧義!!

3.2 JOIN實現機制

Flink雙流JOIN主要分為兩大類。一類是基於原生State的Connect運算元操作,另一類是基於視窗的JOIN操作。其中基於視窗的JOIN可細分為window join和interval join兩種。

  • 實現原理:底層原理依賴Flink的State狀態儲存,通過將資料儲存到State中進行關聯join, 最終輸出結果。

恍然大悟, Flink原來是通過State狀態來快取等待join的實時流。

這裡給大家丟擲一個問題:

用redis儲存可不可以,state儲存相比redis儲存的區別?

更多細節歡迎大家一起探討,添加個人微信: youlong525 拉您進群,還有免費Flink PDF領取~

回到正題,這幾種方式到底是如何實現雙流JOIN的?我們接著往下看。

注意: 後面內容將多以文字 + 程式碼的形式呈現,避免枯燥,我放了一堆原創示意圖~

4 基於Window Join的雙流JOIN實現機制

顧名思義,此類方式利用Flink的視窗機制實現雙流join。通俗理解,將兩條實時流中元素分配到同一個時間視窗中完成Join。

  • 底層原理: 兩條實時流資料快取在Window State中,當視窗觸發計算時,執行join操作。

4.1 join運算元

先看看Window join實現方式之一的join運算元。這裡涉及到Flink中的視窗(window)概念,因此Window Joinan按照視窗型別區分的話某種程度來說可以細分出3種:

  • Tumbling Window Join (滾動視窗)

  • Sliding Window Join (滑動視窗)

  • Session Widnow Join(會話視窗)

兩條流資料按照關聯主鍵在(滾動、滑動、會話)視窗內進行inner join, 底層基於State儲存,並支援處理時間和事件時間兩種時間特徵,看下原始碼:

原始碼核心總結:windows視窗 + state儲存 + 雙層for迴圈執行join()

現在讓我們把時間軸往回拉一點點,在實時場景JOIN那裡我們收到了這樣的需求:統計每分鐘內所有訂單下的商品明細分佈。

OK, 使用join運算元小試牛刀一下。我們定義60秒的滾動視窗,將訂單流和訂單明細流通過order_id關聯,得到如下的程式:

val env = ...
// kafka 訂單流
val orderStream = ... 
// kafka 訂單明細流
val orderDetailStream = ...
 
orderStream.join(orderDetailStream)
    .where(r => r._1)  //訂單id
    .equalTo(r => r._2) //訂單id
    .window(TumblingProcessTimeWindows.of(
          Time.seconds(60)))
    .apply {(r1, r2) => r1 + " : " + r2}
    .print()

整個程式碼其實很簡單,概要總結下:

  • 定義兩條輸入實時流A、B
  • A流呼叫join(b流)運算元
  • 關聯關係定義: where為A流關聯鍵,equalTo為B流關聯鍵,都是訂單id
  • 定義window視窗(60s間隔)
  • apply方法定義邏輯輸出

這樣只要程式穩定執行,就能夠持續不斷的計算每分鐘內訂單分佈詳情,貌似解決問題了奧~

還是別高興太早,別忘了此時的join型別是inner join。複習一下知識: inner join指的是僅保留兩條流關聯上的資料。

這樣雙流中沒關聯上的資料豈不是都丟掉了?別擔心,Flink還提供了另一個window join操作: coGroup運算元。

4.2 coGroup運算元

coGroup運算元也是基於window視窗機制,不過coGroup運算元比Join運算元更加靈活,可以按照使用者指定的邏輯匹配左流或右流資料並輸出。

換句話說,我們通過自己指定雙流的輸出來達到left join和right join的目的。

現在來看看在相同場景下coGroup運算元是如何實現left join:

#這裡看看java運算元的寫法
orderDetailStream
  .coGroup(orderStream)
  .where(r -> r.getOrderId())
  .equalTo(r -> r.getOrderId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
  .apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector)  {
      for (OrderDetail orderDetaill : orderDetailRecords) {
        boolean flag = false;
        for (Order orderRecord : orderRecords) {
          // 右流中有對應的記錄
          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));
          flag = true;
        }
        if (!flag) {
          // 右流中沒有對應的記錄
          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));
        }
      }
    }
  })
  .print();

這裡需要說明幾點:

  • join運算元替換為coGroup運算元
  • 兩條流依然需要在一個window中且定義好關聯條件
  • apply方法中自定義判斷,此處對右值進行判斷:如果有值則進行連線輸出,否則右邊置為NULL。

可以這麼說,現在我們已經徹底搞定了視窗雙流JOIN。

只要你給我提供具體的視窗大小,我就能通過join或coGroup運算元鼓搗出各種花樣join,而且使用起來特別簡單。

但是假如此時我們親愛的產品又提出了一個小小問題:

大促高峰期,商品資料某時段會寫入不及時,時間可能比訂單早也可能比訂單晚,還是要計算每分鐘內的訂單商品分佈詳情,沒問題吧~

當然有問題:兩條流如果步調不一致,還用視窗來控制能join的上才怪了~ 很容易等不到join流視窗就自動關閉了。

還好,我知道Flink提供了Interval join機制。

5 基於Interval Join的雙流JOIN實現機制

Interval Join根據右流相對左流偏移的時間區間(interval)作為關聯視窗,在偏移區間視窗中完成join操作。

有點不好理解,我畫個圖看下:

stream2.time ∈ (stream1.time +low, stream1.time +high)

滿足資料流stream2在資料流stream1的 interval(low, high)偏移區間內關聯join。interval越大,關聯上的資料就越多,超出interval的資料不再關聯。

  • 實現原理:interval join也是利用Flink的state儲存資料,不過此時存在state失效機制ttl,觸發資料清理操作。

這裡再引出一個問題:

state的ttl機制需要怎麼設定?不合理的ttl設定會不會撐爆記憶體?

我會在後面的文章中深入講解下State的ttl機制,歡迎大家一起探討~

下面簡單看下interval join的程式碼實現過程:

val env = ...
// kafka 訂單流
val orderStream = ... 
// kafka 訂單明細流
val orderDetailStream = ...
 
orderStream.keyBy(_.1)
    // 呼叫intervalJoin關聯
    .intervalJoin(orderDetailStream._2)
    // 設定時間上限和下限
    .between(Time.milliseconds(-30), Time.milliseconds(30))  
    .process(new ProcessWindowFunction())
 
class ProcessWindowFunction extends ProcessJoinFunction...{
   override def processElement(...) {
      collector.collect((r1, r2) => r1 + " : " + r2)
   }
}

訂單流在流入程式後,等候(low,high)時間間隔內的訂單明細流資料進行join, 否則繼續處理下一個流。

從程式碼中我們發現,interval join需要在兩個KeyedStream之上操作,即keyBy(),並在between()方法中指定偏移區間的上下界。

需要注意的是interval join實現的也是inner join,且目前只支援事件時間。

6 基於Connect的雙流JOIN實現機制

前面在使用Window join或者Interval Join來實現雙流join的時候,我發現了其中的共性:

無論哪種實現方式,Flink內部都將join過程透明化,在運算元中封裝了所有的實現細節。

這是什麼?是程式語言中的抽象概念~ 隱藏底層細節,對外暴露統一API, 大幅簡化程式編碼。

可是這樣會引來一個問題:如果程式報錯或者資料異常,如何快速進行調優排查,直接看原始碼嗎?不大現實。。

這裡介紹基於Connect運算元實現的雙流JOIN方法,我們可自己控制雙流JOIN處理邏輯,同時保持過程時效性和準確性。

6.1 Connect運算元原理

對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以呼叫不同方法在兩個實時流上執行,且雙流之間可以共享狀態。

圖上我們可以看到,兩個資料流被connect之後,只是被放在了同一個流中,內部依然保持各自的資料和形式,兩個流相互獨立。

[DataStream1, DataStream2] -> ConnectedStreams[1,2]

這樣,我們可以在Connect運算元底層的ConnectedStreams中編寫程式碼,自行實現雙流JOIN的邏輯處理。

6.2 技術實現

1.呼叫connect運算元,根據orderid進行分組,並使用process運算元分別對兩條流進行處理。

orderStream.connect(orderDetailStream)
  .keyBy("orderId", "orderId")
  .process(new orderProcessFunc());

2.process方法內部進行狀態程式設計, 初始化訂單、訂單明細和定時器的ValueState狀態。

private ValueState<OrderEvent> orderState;
private ValueState<TxEvent> orderDetailState;
private ValueState<Long> timeState;

// 初始化狀態Value
orderState = getRuntimeContext().getState(
 new ValueStateDescriptor<Order>
 ("order-state",Order.class));
····

3.為每個進入的資料流儲存state狀態並建立定時器。在時間視窗內另一個流達到時進行join並輸出,完成後刪除定時器。

@Override
public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){
  if (orderDetailState.value() == null){
    //明細資料未到,先把訂單資料放入狀態
     orderState.update(value);
    //建立定時器,60秒後觸發
     Long ts = (value.getEventTime()+10)*1000L;
     ctx.timerService().registerEventTimeTimer(
       ts);
     timeState.update(ts);
  }else{
    //明細資料已到,直接輸出到主流
     out.collect(new Tuple2<>(value,orderDetailS
       tate.value()));
    //刪除定時器
     ctx.timerService().deleteEventTimeTimer
      (timeState.value());
     //清空狀態,注意清空的是支付狀態
      orderDetailState.clear();
      timeState.clear();
  }
}
...
@Override
public void processElement2(){
  ...
}

4.未及時達到的資料流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {
  // 實現左連線
   if (orderState.value() != null){
       ctx.output(new OutputTag<String>("left-jo 
       in") {}, 
       orderState.value().getTxId());
   // 實現右連線
   }else{
      ctx.output(new OutputTag<String>("left-jo 
       in") {}, 
       orderDetailState.value().getTxId());
   }
   orderState.clear();
   orderDetailState.clear();
   timeState.clear();
}

總體思想: 基於資料時間實現訂單資料及訂單明細資料的關聯,超時或者缺失則由側輸出流輸出。

在connect中針對訂單流和訂單明細流,先建立定時器並儲存state狀態,處於視窗內就進行join, 否則進入側輸出流。

7 雙流JOIN的優化與總結

  • 為什麼我的雙流join時間到了卻不觸發,一直沒有輸出

檢查一下watermark的設定是否合理,資料時間是否遠遠大於watermark和視窗時間,導致視窗資料經常為空

  • state資料儲存多久,會記憶體爆炸嗎

state自帶有ttl機制,可以設定ttl過期策略,觸發Flink清理過期state資料。建議程式中的state資料結構用完後手動clear掉。

  • 我的雙流join傾斜怎麼辦

join傾斜三板斧: 過濾異常key、拆分表減少資料、打散key分佈。當然可以的話我建議加記憶體!加記憶體!加記憶體!!

  • 想實現多流join怎麼辦

目前無法一次實現,可以考慮先union然後再二次處理;或者先進行connnect操作再進行join操作,僅建議~

  • join過程延遲、沒關聯上的資料會丟失嗎

這個一般來說不會,join過程可以使用側輸出流儲存延遲流;如果出現節點網路等異常,Flink checkpoint也可以保證資料不丟失。

某日

面試官: Flink雙流join瞭解嗎? 簡單說說其實現原理。

某君: Flink雙流JOIN是。。。

 

點選關注,第一時間瞭解華為雲新鮮技術~