RocketMQ 延時方案分析與總結

語言: CN / TW / HK

政採雲技術團隊.png

知行.png

一.需求背景 (Background & Motivation)

1.1 概念 (Concept)

  • 定時訊息:Producer 將訊息傳送到訊息佇列 RocketMQ 版服務端,但並不期望立馬投遞這條訊息,而是推遲到在當前時間點之後的某一個時間投遞到 Consumer 進行消費,該訊息即定時訊息。
  • 延時訊息:Producer 將訊息傳送到訊息佇列 RocketMQ 版服務端,但並不期望立馬投遞這條訊息,而是延遲一定時間後才投遞到 Consumer 進行消費,該訊息即延時訊息。

定時訊息與延時訊息在程式碼配置上存在一些差異,但是最終達到的效果相同:訊息在傳送到訊息佇列 RocketMQ 版服務端後並不會立馬投遞,而是根據訊息中的屬性延遲固定時間後才投遞給消費者。

1.2 解決何種問題 (What problem is this proposal designed to solve?)

目前在RocketMQ(社群版本)的功能中,已經支援了延時訊息。 但對於該功能只支援固定粒度的延遲級別(18級), 如:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

雖然這些粒度可以配置,但是還是無法應對廣泛的業務場景。

比如定義了 1s 、5s 的 Level,那麼使用者只能傳送 1s 或 5s 的延時訊息,不能傳送 3s 延遲的訊息。

對比RocketMQ(阿里雲)版本已經支援了 40 天內任意時延的訊息。

(注:在本文中將討論如何在 RocketMQ(社群版本)改造任意時延訊息的方案。)

二.目標 (Goals)

在RocketMQ(社群版本)中新增任意時延訊息的功能:

  • 精度支援到秒級別

  • 最大支援 30 天的延遲

三.困難點 (Difficulties)

那麼任意時延訊息的難點在哪?

  • 排序

    對於服務端收到的延時訊息,需要對投遞時間進行排序。在 MQ 中,為了保證可靠性,訊息是需要落盤的,且對效能和延遲的要求,決定了在服務端對訊息進行排序是完全不可接受的。

  • 訊息儲存

    對於延時訊息來說,需要儲存最近30天的訊息。而 RocketMQ 是基於 WAL,對於高流量的場景下,儲存 30 天的訊息需要多少伺服器。

四.設計思路 (Design)

4.1 原延時方案 (Original scheme)

先來看一下原延時方案的設計:

對於 normal msg 來說:

  • 訊息會被順序寫入 commitlog;

  • 在非同步 reput 中,通過 DispatchService 生成對應的 consumer queue 索引;

  • 消費者即可以消費對應的訊息。

對於 delay msg 來說:

  • 訊息在被寫入 commitlog 前,會將 topic 設定為 SCHEDULE_TOPIC,並將 real topic 儲存在 properties 中,並且將 queueId 設定為 delay level 的值;

  • 在非同步 reput中,通過 DispatchService 會生成 SCHEDULE_TOPIC 的 consumer queue,共18個;

  • 在 ScheduleMessageService 中給每個 level 設定定時器,從 ScheduledConsumeQueue 中讀取資訊。如果 ScheduledConsumeQueue 中的元素已經到時,那麼從 CommitLog 中讀取訊息內容,恢復訊息內容寫入 CommitLog,再按 normal msg 消費。

方案分析:

  • 優點1:將排序轉化為分類

  • 優點2:Level數固定即執行緒數量固定,開銷不大

  • 缺點1:固定Level,不夠靈活

  • 缺點2:若延時很長,會導致CommitLog的量很大

4.2 新延時方案 (New scheme)

4.2.1 如何解決排序

我們可以用原方案類似的方式,用分類操作解決排序。

可以使用 TimeWheel:《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》

我們只需要將即將傳送的訊息載入到TimeWheel中,再按時間取出delay msg將其放入real topic中。

這裡的“即將傳送的訊息”可以定義為 30 分鐘(timeSliceLength)內的訊息,TimeWheel中每一個格子為1秒,那麼一個 TimeWheel 為 1800格。

這裡我們可以估算一下TimeWheel的記憶體需求,按每個訊息 500B,那麼 200萬 個訊息則需要 1G 的記憶體。

我們可以根據延遲訊息的量,來設定 timeSliceLength 的大小。以下我們將以 timeSliceLength=1800 展開討論。

4.2.2 如何解決訊息儲存

將延時訊息單獨儲存至 CommitLog(以下將此稱為“DCommitLog”)

那麼如何設計 DCommitLog?我們可以將時間段按 timeSliceLength 進行雜湊,將整個 timeSliceLength 時間內的 delay msg 放入同一個 DCommitLog 中: ```

公式

SliceTime = timestamp / timeSliceLength / 100

若delay msg的投遞時間為 1641349066282,則帶入公式得到

1641349066282 / 1800 / 1000 = 911860

即該訊息會被寫入 911860 的 DCommitLog 中。

```

4.2.3 延時方案

有了以上基本的概念,我們設計一下整個延時方案:

ScheduleMessageService 主要完成 DispatchMsg 和 TimeWheelTask。

  • DispatchMsg 對 SCHEDULE_TOPIC 按 delayOffset 進行內部消費,並根據 deliveryTime 將訊息投遞到 real topic、TimeWheel、DCommitLog。
  • TimeWheelTask 則按 currentTime 從 TimeWheel 取出訊息投遞到 real topic 並提前載入下一個時間片的訊息、切換 TimeWheel。這裡的 currentTime 是 TimeWheel 所用的,而非現實的時間戳。
  • delayOffsetcurrentTime 需要落盤到 delayOffset.json。

  1. 訊息在被寫入 commitlog 前,會將 topic 設定為 SCHEDULE_TOPIC,並將 real topic 儲存在 properties 中,並且將 queueId 設定為1;
  2. consume with delayOffset:根據 delayOffset 消費 SCHEDULE_TOPIC,得到 delay msg;
  3. 根據 delay msg 計算 deliveryTime,若 deliveryTime < currentTime,即訊息已經超時直接將消費放入 real topic,進入7,否則進入4;
  4. 若 deliverySliceTime == currentSliceTime,即訊息在本時間片內,將其放入 TimeWheel 和 DCommiLog 中,進入7,否則進入5;
  5. 若 deliverySliceTime == currentSliceTime +1 && nextTimeWheel != null,即訊息在下一時間片並且下一時間輪已經載入,則將訊息放入 nextTimeWheel 和 DCommitLog,進入7,否則進入6;
  6. 訊息在後面的時間片後,直接將其放入 DCommiLog 中;
  7. 根據 currentTime 從 TimeWheel 獲取需要投遞到 real topic 中的訊息集合;
  8. 投遞到 real topic;
  9. 若還沒有開始載入下一個時間片的資料,未開始載入進入10,已開始則進入11;
  10. 若 currentTime 接近 nextTimeSlice,非同步載入下一個時間片 DCommiLog 中的資料到下一個時間輪中,否則12;
  11. 若 currentTime 已經到 nextTimeSlice,則將時間輪切換至下一個時間輪;
  12. 更新 currentTime、delayOffset,每次 currentTime 最多增長1秒,且不能超過 system.currentTimeMillis(),進入2;

4.3 總結 (Summary)

以上流程,分為 DispatchMsg、TimeWheelTask,DispatchMsg 分發 delay msg 將其歸類到 DCommitLog 中,TimeWheelTask 則取出 DCommitLog 放入 TimeWheel 中進行時間排程並將 msg 放入 real topic 中。

方案待優化

DCommitLog 會帶來隨機讀寫的問題,用 mmp 的同時也會影響 CommitLog 的 PageCache。

可以考慮 normal broker 和 delay broker 分離部署的方式來解決或者調整 DCommitLog 的檔案大小優化。

另外因為 TimeWheel 存在在記憶體中,對於 JVM 的記憶體需求也會隨之上升。

4.3.1 對應的MASTER-SLAVE、DLeger模式需要作何處理?

  • 在 MASTER-SLAVE 中,由於 SLAVE 沒有寫入許可權 TimeWheelTask 則不需要啟動,而 SLAVE 不會自動切換 Master,也就不需要生成 DCommitLog;只能重啟 master;

  • 在 DLeger 模式中,SLAVE 同樣沒有寫入許可權,TimeWheelTask 不需要啟動。但是 SLAVE會自動切換 Master,此時需要生成 DCommitLog 用於在容災恢復後保證 delay msg 可以正常被訊息。這裡需要將 Matser的delayOffset.json 的資料同步到 Slave 上。

4.3.2 對重試策略的影響

由於重試依賴於原延時方案的固定粒度時間間隔,在使用本方案改造後需要對 sendMessageBack 做額外重試策略的處理。

4.3.3 方案注意事項

1、DCommitLog 會帶來隨機讀寫的問題,用 mmp 的同時也會影響 CommitLog 的 PageCache。可以考慮 normal broker 和 delay broker 分離部署的方式來解決或者調整 DCommitLog 的檔案大小優化。

2、因為 TimeWheel 存在在記憶體中,對於 JVM 的記憶體需求也會隨之上升。對於延遲訊息使用較高的場景需要特別注意合理設定 JVM 大小。優化:TimeWheel 中的 msg 改為 msg 的 offset,可明顯降低 JVM 的記憶體需求,1G=6250萬 條訊息

以上就是《RocketMQ延時方案分析與總結》, 希望能給你在工作中起到輔助作用,如有任何問題可以在評論區中提出你的想法和建議。

推薦閱讀

Dapr 實戰(一)

Dapr 實戰(二)

政採雲Flutter低成本螢幕適配方案探索

招賢納士

政採雲技術團隊(Zero),一個富有激情、創造力和執行力的團隊,Base 在風景如畫的杭州。團隊現有300多名研發小夥伴,既有來自阿里、華為、網易的“老”兵,也有來自浙大、中科大、杭電等校的新人。團隊在日常業務開發之外,還分別在雲原生、區塊鏈、人工智慧、低程式碼平臺、中介軟體、大資料、物料體系、工程平臺、效能體驗、視覺化等領域進行技術探索和實踐,推動並落地了一系列的內部技術產品,持續探索技術的新邊界。此外,團隊還紛紛投身社群建設,目前已經是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等眾多優秀開源社群的貢獻者。如果你想改變一直被事折騰,希望開始折騰事;如果你想改變一直被告誡需要多些想法,卻無從破局;如果你想改變你有能力去做成那個結果,卻不需要你;如果你想改變你想做成的事需要一個團隊去支撐,但沒你帶人的位置;如果你想改變本來悟性不錯,但總是有那一層窗戶紙的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望參與到隨著業務騰飛的過程,親手推動一個有著深入的業務理解、完善的技術體系、技術創造價值、影響力外溢的技術團隊的成長過程,我覺得我們該聊聊。任何時間,等著你寫點什麼,發給 [email protected]

微信公眾號

文章同步釋出,政採雲技術團隊公眾號,歡迎關注

政採雲技術團隊.png