RocketMQ-Streams架構設計淺析

語言: CN / TW / HK

*作者:倪澤,RocketMQ 資深貢獻者, RocketMQ-Streams 維護者之一,阿里雲技術專家。 * RocketMQ-Streams 是一款輕量級流處理引擎,應用以SDK 的形式嵌入並啟動,即可進行流處理計算,不依賴於其他元件,最低1核1G可部署,在資源敏感場景具有很大優勢。同時它支援 UTF/UTAF/UTDF 多種計算型別。目前已經廣泛運用於安全,風控,邊緣計算等場景。

本期將帶領大家從原始碼的角度,解析RocketMQ-Streams的構建,資料流轉過程。也會討論RocketMQ-Streams是如何實現故障恢復和擴縮容的。

使用示例

程式碼示例:

public class RocketMQWindowExample {
    public static void main(String[] args) {
        DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
        source.fromRocketmq(
                "topicName",
                "groupName",
                false,
                "namesrvAddr")
                .map(message -> JSONObject.parseObject((String) message))
                .window(TumblingWindow.of(Time.seconds(10)))
                .groupBy("groupByKey")
                .sum("欄位名", "輸出別名")
                .count("total")
                .waterMark(5)
                .setLocalStorageOnly(true)
                .toDataSteam()
                .toPrint(1)
                .start();
    }
}

pom檔案依賴:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-streams-clients</artifactId>
  <version>1.0.1-preview</version>
</dependency>

上述程式碼是一個簡單的使用例子,它主要的功能是從RocketMQ中指定topic讀取資料,經過轉化成JSON格式,以groupByKey欄位值分組、10秒一個視窗,對OutFlow欄位值進行累加,結果輸出到total欄位,並列印到控制檯上。上述計算中還允許輸入亂序5秒,即視窗時間到達後不會馬上觸發,而是會等待5s,如果這個段時間內,有視窗資料到達依然有效。上述setLocalStorageOnly為true表示不對狀態進行遠端儲存,僅使用RocksDB做本地儲存。目前1.0.1的RocketMQ-Streams版本依然使用Mysql作為遠端狀態儲存,下一版本將使用RocketMQ作為遠端狀態儲存。

RocketMQ總體架構圖

在這裡插入圖片描述

RocketMQ-Streams 作為輕量流處理引擎,本質上是作為RocketMQ 的客戶端消費資料,一個流處理例項可以處理多個佇列,而一個佇列只能被一個例項消費。若干RocketMQ-Streams 例項組成消費者組共同消費資料,通過擴容例項達到增加處理能力的消費,減少例項則會發生rebalance,消費的佇列自動重平衡到其他消費例項上。從上述圖中,我們還可以看出計算例項間不需要直接交換任何資料,可各自獨立完成所有計算處理。這種架構簡化了RocketMQ-Streams 本身的設計,同時也可非常方便的進行例項擴縮容。

處理拓撲 處理器拓撲為應用定義了流處理過程的計算邏輯,它由一系列的處理器節點和資料流向組成。例如,在開頭的程式碼示例中,整個處理拓撲由source、map、groupBy、sum、count、print等處理節點組成。有兩種特殊的處理節點:

• source節點 他沒有任何上游節點,從外部讀入資料到RocketMQ-Streams,並交由下游處理。 • sink節點 他沒有任何下游節點,他將處理後的資料寫出到外部。

處理拓撲僅僅是流處理程式碼的邏輯抽象,在流計算啟動時將會被例項化。為了設計簡單,目前一個流處理例項中僅有一張計算拓撲。 在所有流處理運算元之中,有兩種特別的運算元,一種是涉及資料分組的運算元groupBy,另一種是有狀態計算例如count等。這兩種運算元會影響整個計算拓撲的構建,下面將具體分析RocketMQ-Streams是如何處理他們的。

groupBy 分組運算元groupBy特殊是因為經過groupBy操作,後續運算元期望對相同key的資料進行操作,例如經過groupBy("年級")之後再進行sum就是對按照年級分組求和,這就要求需要將具有相同“年級”的資料重新路由到一個流計算例項上處理,如果不這樣做,每個例項上得出的結果都將是不完整的,整體輸出結果也將是錯誤的。

RocketMQ-Streams 採用 shuffle topic 這種方式來處理。具體說來,計算例項將groupBy資料重新發回RocketMQ的一個topic,並且在發回過程中按照key的hash值來選擇目標佇列,再從這個topic讀取資料進行後續流處理。按照key hash後相同的key一定在一個佇列裡面,而一個佇列只會被一個流處理例項消費,這樣就達到相同key被路由到一個例項上處理的效果。

有狀態運算元 有狀態運算元與無狀態運算元相對。如果計算結果只與當前輸入有關,和上一次輸入無關就是無狀態運算元,例如filter、map、foreach結果只與當前輸入有關係。還有一種運算元的輸出結果不僅與當前運算元有關係還與上一次輸入有關,例如sum,需要對一段時間內輸入進行求和,他就是有狀態運算元。 RocketMQ-Streams利用RocksDB作為本地儲存,Mysql作為遠端儲存來儲存狀態資料。他具體做法是:

  1. 當發現訊息來自新的佇列時,檢查是否需要載入狀態,如果需要非同步載入狀態到RocksDB。
  2. 資料到達有狀態運算元時,如果載入完成使用RocksDB中狀態進行計算,如果沒有,使用Mysql中狀態計算。
  3. 計算完成後,將狀態資料儲存到RocksDB和Mysql中。
  4. 視窗觸發後,從RocksDB中查詢出狀態資料,並將結果向下遊運算元傳遞。

整體資料流向圖如下: 在這裡插入圖片描述

擴縮容與故障恢復

擴縮容和故障恢復是一個硬幣的兩面,即同一個事物的兩種表達,計算叢集如果能正確擴縮容就等於具備故障恢復的能力,反之亦然。通過前面介紹我們知道,RocketMQ-Streams具有非常良好的擴縮容效能,擴容時只需要新部署一個流計算例項即可,縮容時停止計算例項即可。對於無狀態的計算來說比較簡單,擴容後,資料計算不需要之前的狀態。有狀態計算的擴縮容涉及到狀態的遷移。有狀態的擴縮容可由下圖表示: 在這裡插入圖片描述

當計算例項從3個縮容到2個,藉助於RocketMQ的rebalance,MQ會在計算例項之間重新分配。

Instance1上消費的MQ2和MQ3被分配到Instance2和Instance3上,這兩個MQ的狀態資料也需要遷移到Instance2和Instance3上,這也暗示,狀態資料是根據源資料分片儲存的;擴容則是剛好相反的過程。

具體實現上,RocketMQ-Streams採用系統訊息來觸發狀態的載入和持久化。

系統訊息類別:

//新增消費佇列
NewSplitMessage

//不在消費某個佇列
RemoveSplitMessage

//客戶端持久化消費位點到MQ
CheckPointMessage

當發現訊息來自一個新的RocketMQ佇列(MessageQueue),RocketMQ-Streams之前沒有處理過來自該佇列的訊息,會先於資料前傳送NewSplitMessage訊息,通過處理拓撲下游運算元傳遞,當有狀態運算元收到該訊息時會將新增佇列對應的狀態載入到本地記憶體RocksDB中,當資料真正到達時,就根據這個狀態繼續計算。

當因為計算例項增加或者RocketMQ叢集變動,rebalance後,計算例項不再消費某個佇列(MessageQueue)時,會發出RemoveSplitMessage訊息,有狀態運算元刪除本地RocksDB中的狀態。

CheckPointMessage是一種特別的系統訊息,他的作用與實現exactly-once有關。我們在擴縮容過程中需要做到exactly-once,才能保證擴縮容或故障恢復對計算結果沒有影響。RocketMQ-streams向broker提交消費offset前會產生CheckPointMessage訊息,向下遊拓撲傳遞,他將保證即將提交消費位點的所有訊息都已經被sink處理掉。

開源地址: RocketMQ-Streams 倉庫地址: https://github.com/apache/rocketmq-streams RocketMQ 倉庫地址: https://github.com/apache/rocketmq

加入 Apache RocketMQ 社群

十年鑄劍,Apache RocketMQ 的成長離不開全球接近 500 位開發者的積極參與貢獻,相信在下個版本你就是 Apache RocketMQ 的貢獻者,在社群不僅可以結識社群大牛,提升技術水平,也可以提升個人影響力,促進自身成長。

社群 5.0 版本正在進行著如火如荼的開發,另外還有接近 30 個 SIG(興趣小組)等你加入,歡迎立志打造世界級分散式系統的同學加入社群,新增社群開發者微信:rocketmq666 即可進群,參與貢獻,打造下一代訊息、事件、流融合處理平臺。