RocketMQ 消費者(1)概念和消費流程
1. 背景
RocketMQ 的消費可以算是 RocketMQ 的業務邏輯中最複雜的一塊。這裡面涉及到許多消費模式和特性。本想一篇文章寫完,寫到後面發現消費涉及到的內容太多,於是決定分多篇來寫。本文作為消費系列的第一篇,主要講述 RocketMQ 消費涉及到的模式和特性,也會概括性地講一下消費流程。
我將 RocketMQ 的消費流程大致分成 4 個步驟
- 重平衡
- 消費者拉取訊息
- Broker 接收拉取請求後從儲存中查詢訊息並返回
- 消費者消費訊息
每個步驟都會用一篇文章來講解。
先了解一下 RocketMQ 消費涉及到地概念
2. 概念簡述
2.1 消費組概念與消費模式
和大多數訊息佇列一樣,RocketMQ 支援兩種訊息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。在瞭解它們之前,需要先引入消費組的概念。
2.1.1 消費組
一個消費者例項即是一個消費者程序,負責消費訊息。單個消費者速度有限,在實際使用中通常會採用多個消費者共同消費同樣的 Topic 以加快消費速度。這多個消費同樣 Topic 的消費者組成了消費者組。
消費組是一個邏輯概念,它包含了多個同一類的消費者例項,通常這些消費者都消費同一類訊息(都消費相同的 Topic)且消費邏輯一致。
消費組的引入是用來在消費訊息時更好地進行負載均衡和容錯。
2.1.2 廣播消費模式(BROADCASTING)
廣播消費模式即全部的訊息會廣播分發到所有的消費者例項,每個消費者例項會收到全量的訊息(即便消費組中有多個消費者都訂閱同一 Topic)。
如下圖所示,生產者傳送了 5 條訊息,每個消費組中的消費者都收到全部的 5 條訊息。
廣播模式使用較少,適合各個消費者都需要通知的場景,如重新整理應用中的快取。
注意事項:
- 廣播消費模式下不支援 順序訊息。
- 廣播消費模式下不支援 重置消費位點。
- 每條訊息都需要被相同訂閱邏輯的多臺機器處理。
- 消費進度在客戶端維護,出現重複消費的概率稍大於叢集模式。如果消費進度檔案丟失,存在訊息丟失的可能。
- 廣播模式下,訊息佇列 RocketMQ 版保證每條訊息至少被每臺客戶端消費一次,但是並不會重投消費失敗的訊息,因此業務方需要關注消費失敗的情況。
- 廣播模式下,客戶端每一次重啟都會從最新訊息消費。客戶端在被停止期間傳送至服務端的訊息將會被自動跳過,請謹慎選擇。
- 廣播模式下,每條訊息都會被大量的客戶端重複處理,因此推薦儘可能使用叢集模式。
- 廣播模式下服務端不維護消費進度,所以訊息佇列 RocketMQ 版控制檯不支援訊息堆積查詢、訊息堆積報警和訂閱關係查詢功能。
2.1.3 叢集消費模式(CLUSTERING)
叢集消費模式下,同一 Topic 下的一條訊息只會被同一消費組中的一個消費者消費。也就是說,訊息被負載均衡到了同一個消費組的多個消費者例項上。
更具體一點,在同一消費組中的不同消費者會根據負載機制來平均地訂閱 Topic 中的每個 Queue。(預設 AVG 負載方式)
RocketMQ 預設使用叢集消費模式,這也是大部分場景下會使用到的消費模式。
2.2 消費者拉取訊息模式
2.2.1 Pull
指消費者主動拉取訊息進行消費,主動從 Broker 拉取訊息,主動權由消費者應用控制。
2.2.2 Push
指 Broker 主動將訊息 Push 給消費者,Broker 收到訊息就會主動推送到消費者端。該模式的消費實時性較高,也是主流場景中普遍採用的消費形式。
消費者組中的消費者例項會根據預設的負載均衡演算法對 Topic 中的 Queue 進行均勻的訂閱,每個 Queue 最多隻能被一個消費者訂閱。
在 RocketMQ 中,Push 消費其實也是由 Pull 消費(拉取)實現。Push 消費只是通過客戶端 API 層面的封裝讓使用者感覺像是 Broker 在推送訊息給消費者。
2.2.3 POP
RocketMQ 5.0 引入的新消費形式,是 Pull 拉取的另一種實現。也可以在 Push 模式下使用 POP 拉取訊息,甚至可以和 Push 模式共同使用(分別消費重試 Topic 和普通 Topic)。
POP 與 Pull 可以通過一個開關實時進行切換。POP 模式下,Broker 來控制每個消費者消費的佇列和拉取的訊息,把重平衡邏輯從客戶端移到了服務端。
主要解決了原來 Push 模式消費的以下痛點:
- 富客戶端:客戶端邏輯比較重,多語言支援不友好
- 佇列獨佔:Topic 中的一個 Queue 最多隻能被 1 個 Push 消費者消費,消費者數量無法無限擴充套件。且消費者 hang 住時該佇列的訊息會堆積。
- 消費後更新 offset:本地消費成功才會提交 offset
RocketMQ 5.0 的輕量化 gRPC 客戶端就是基於 POP 消費模式開發
2.3 佇列負載機制與重平衡
在叢集消費模式下,消費組中的消費者共同消費訂閱的 Topic 中的所有訊息,這裡就存在 Topic 中的佇列如何分配給消費者的問題。
2.3.1 佇列負載機制
RocketMQ Broker 中的佇列負載機制將一個 Topic 的不同佇列按照演算法儘可能平均地分配給消費者組中的所有消費者。RocketMQ 預設了多種負載演算法供不同場景下的消費。
AVG:將佇列按數量平均分配給多個消費者,按 Broker 順序先分配第一個 Broker 的所有佇列給第一個消費者,然後給第二個。
AVG_BY_CIRCLE:將 Broker 上的佇列輪流分給不同消費者,更適用於 Topic 在不同 Broker 之間分佈不均勻的情況。
預設採用 AVG 負載方式。
2.3.2 重平衡(Rebalance)
為消費者分配佇列消費的這一個負載過程並不是一勞永逸的,比如當消費者數量變化、Broker 掉線等情況發生後,原先的負載就變得不再均衡,此時就需要重新進行負載均衡,這一過程被稱為重平衡機制。
每隔 20s,RocketMQ 會進行一次檢查,檢查佇列數量、消費者數量是否發生變化,如果變化則觸發消費佇列重平衡,重新執行上述負載演算法。
2.4 消費端高可靠
2.4.1 重試-死信機制
在實際使用中,訊息的消費可能出現失敗。RocketMQ 擁有重試機制和死信機制來保證訊息消費的可靠性。
-
正常消費:消費成功則提交消費位點
-
重試機制:如果正常消費失敗,訊息會被消費者發回 Broker,放入重試 Topic:
%RETRY%消費者組
。最多重試消費 16 次,重試的時間間隔逐漸變長。(消費者組會自動訂閱重試 Topic)。
這裡地延遲重試採用了 RocketMQ 的延遲訊息,重試的 16 次時間間隔為延遲訊息配置的每個延遲等級的時間(從第三個等級開始)。如果修改延遲等級時間的配置,重試的時間間隔也會相應發生變化。但即便延遲等級時間間隔配置不足 16 個,仍會重試 16 次,後面按照最大的時間間隔來重試。
- 死信機制:如果正常消費和重試 16 次均失敗,訊息會儲存到死信 Topic
%DLQ%消費者組
中,此時需人工介入處理
2.4.2 佇列負載機制與重平衡
當發生 Broker 掛掉或者消費者掛掉時,會引發重平衡,可以自動感知有元件掛掉的情況並重新調整消費者的訂閱關係。
2.5 併發消費與順序消費
在消費者客戶端消費時,有兩種訂閱訊息的方式,分別是併發消費和順序消費。廣播模式不支援順序消費,僅有叢集模式能使用順序消費。
需要注意的是,這裡所說的順序消費指的是佇列維度的順序,即在消費一個佇列時,消費訊息的順序和訊息傳送的順序一致。如果一個 Topic 有多個佇列, 是不可能達成 Topic 級別的順序消費的,因為無法控制哪個佇列的訊息被先消費。Topic 只有一個佇列的情況下能夠實現 Topic 級別的順序消費。
具體順序生產和消費程式碼見 官方文件。
順序生產的方式為序列生產,並在生產時指定佇列。
併發消費的方式是呼叫消費者的指定 MessageListenerConcurrently
作為消費的回撥類,順序消費則使用 MessageListenerOrderly
類進行回撥。處理這兩種消費方式的消費服務也不同,分別是 ConsumeMessageConcurrentlyService
和 ConsumeMessageOrderlyService
。
順序消費的大致原理是依靠兩組鎖,一組在 Broker 端(Broker 鎖),鎖定佇列和消費者的關係,保證同一時間只有一個消費者在消費;在消費者端也有一組鎖(消費佇列鎖)以保證消費的順序性。
2.6 消費進度儲存和提交
消費者消費一批訊息完成之後,需要儲存消費進度。如果是叢集消費模式,還需要將消費進度讓其他消費者知道,所以需要提交消費進度。這樣在消費者重啟或佇列重平衡時可以根據消費進度繼續消費。
不同模式下消費進度儲存方式的不同:
- 廣播模式:儲存在消費者本地。因為每個消費者都需要消費全量訊息訊息。在
LocalfileOffsetStore
當中。 - 叢集模式:儲存在 Broker,同時消費者端快取。因為一個 Topic 的訊息只要被消費者組中的一個消費者消費即可,所以訊息的消費進度需要統一儲存。通過
RemoteBrokerOffsetStore
儲存。
叢集模式下,消費者端有定時任務,定時將記憶體中的消費進度提交到 Broker,Broker 也有定時任務將記憶體中的消費偏移量持久化到磁碟。此外,消費者向 Broker 拉取訊息時也會提交消費偏移量。注意,消費者執行緒池提交的偏移量是執行緒池消費的這一批訊息中偏移量最小的訊息的偏移量。
- 消費完一批訊息後將訊息消費進度存在本地記憶體
- 消費者中有一個定時執行緒,每 5s 將記憶體中所有佇列的消費偏移量提交到 Broker
- Broker 收到消費進度先快取到記憶體,有一個定時任務每隔 5s 將訊息偏移量持久化到磁碟
- 消費者向 Broker 拉取訊息時也會將佇列的訊息偏移量提交到 Broker
3. 消費流程
這張圖是阿里雲的文章講解消費時用到的,能夠清晰地表示客戶端 Push 模式併發消費流程。
從左上角第一個方框開始看
- 消費者啟動時喚醒重平衡服務
RebalanceService
,重平衡服務是客戶端開始消費的起點。 - 重平衡服務會週期性(每 20s)執行重平衡方法
doRebalance)
,查詢所有註冊的 Broker,根據註冊的 Broker 數量為自身分配負載的佇列rebalanceByTopic()
- 分配完佇列後,會為每個分配到的新佇列建立一個訊息拉取請求
pullRequest
,這個拉取請求中儲存一個處理佇列processQueue
,即圖中的紅黑樹(TreeMap
),用來儲存拉取到的訊息。紅黑樹儲存訊息的順序。 - 訊息拉取執行緒應用生產-消費模式,用一個執行緒從拉取請求佇列
pullRequestQueue
中彈出拉取請求,執行拉取任務,將拉取到的訊息放入處理佇列。 - 拉取請求在一次拉取訊息完成之後會複用,重新被放入拉取請求佇列
pullRequestQueue
中 - 拉取完成後,在
NettyClientPublicExecutorThreadPool
執行緒池非同步處理結果,將拉取到的訊息放入處理佇列,然後呼叫consumeMessageService.submitConsumeRequest
,將處理佇列和 多個消費任務提交到消費執行緒池。每個消費任務消費 1 批訊息(1 批預設為 1 條) - 每個消費者都有一個消費執行緒池
consumeMessageThreadPool
,預設有 20 個消費執行緒。 - 消費執行緒池的每個消費執行緒會嘗試從消費任務佇列中獲取消費請求,執行消費業務邏輯
listener.consumeMessage
。 - 消費完成後,如果消費成功,則更新偏移量
updateOffset
(先更新到記憶體offsetTable
,定時上報到 Broker。Broker 端也先放到記憶體,定時刷盤)。