基於 RocketMQ 的 MQTT 服務架構在小米的實踐

語言: CN / TW / HK

1.jpeg

本文作者:房成進 - 小米高階研發工程師

小米 MQTT應用場景

2.png

小米之家門店的支付通知是小米MQTT落地的重要場景之一,流程如上圖所示。店員通過終端傳送下單請求到後端服務,後端在接收到下單請求後,呼叫支付服務,等待使用者付款。門店終端如何知道本次付款是否成功呢?

我們採用MQTT協議來實現支付訊息的通知。支付服務將本次訂單的支付結果釋出到MQTT 服務的一個 Topic中,門店終端與服務保持長連線,訂閱 Topic來實時獲取支付結果,從而進行下一步操作如打印發票等。得益於 TCP長連線和MQTT協議的輕量化,門店終端系統的支付響應能力從 200 毫秒下降至 10 毫秒內,MQTT服務釋出端到訂閱端的平均延時為2.6ms。

2.png

手機智慧製造工廠是小米MQTT落地的另一個核心場景。MQTT主要應用於裝置狀態資料採集以及裝置控制指令下發。上圖右側為小米智慧製造工廠架構圖。

上行鏈路流程為:手機生產線上的眾多工業裝置會將操作日誌、裝置引數、環境引數等通過工業控制層釋出到MQTT服務,MQTT服務的儲存層通過資料整合任務將資料打入大資料系統,進行資料的分析、建模和處理等,最後實現最上層應用工業 BI 和數字孿生的需求。

下行鏈路流程為:工廠的工作人員通過雲端服務將控制指令下發到MQTT叢集,生產線上的裝置與MQTT服務叢集保持長連線,以接受來自雲端的控制指令並執行相應動作。這兩個鏈路對時效性要求很高。目前, MQTT 服務能保證上行和下行鏈路延時在 20ms以內,服務可用性為99.95%。

小米 MQTT服務架構演進過程

3.png

早期,小米主要基於RocketMQ 社群在 18 年開源的RocketMQ-IoT-Bridge來構建自己的 MQTT 服務。RocketMQ-IoT-Bridge為單機架構,一是不支援水平擴充套件,總連線數存在瓶頸,自然無法保證高可用。二是資料無法持久化,只提供記憶體儲存,一旦重啟服務,必然導致訊息丟失。三是隻支援MQTT 協議QoS0,訊息存在丟失風險,無法滿足小米的業務要求。如圖所示,服務整體為單機服務架構,釋出端和訂閱端連線到同一個程序。

4.png

小米基於單機的架構進行了一系列的拓展。高可用方面,從單機變為分散式的可擴充套件架構,連線數從單機的 5 萬變為可橫向擴充套件的模式;可靠性方面,在QoS0 的基礎上實現了MQTT協議規定的 QoS 1 和 QoS 2;消費模式方面,除了預設的廣播消費,支援了MQTT5.0新增的共享消費模式,同時還支援了離線訊息。

5.png

上圖右側是小米基於 RocketMQ 的分散式 MQTT 架構圖。最上層為客戶端,釋出者和訂閱者連線到負載均衡器,這裡使用四層的負載均衡LVS, 主要目的是將請求均攤到各個MQTT Bridge。MQTT Bridge 即MQTT的服務節點,負責連線、訂閱、解析協議和訊息轉發。RocketMQ 作為儲存層,負責持久化訊息。類似於存算分離設計,MQTT Bridge 和 RocketMQ 均可獨立水平擴充套件。

得益於 RocketMQ 從 2020 年開始在小米大規模落地,我們採用RocketMQ來持久化 MQTT 訊息。整個釋出訂閱的過程演變成訊息從 Bridge傳送到RocketMQ,再從RocketMQ消費資料然後推送到訂閱端。每一個MQTT Bridge 內嵌 RocketMQ SDK ,充當 RocketMQ的客戶端,既作為生產者也作為消費者。

此外,持久化層支援了小米自研的訊息佇列Talos,提供了可插拔模式。根據業務資料的下游使用場景,部署時可靈活選擇任意一個訊息佇列作為持久化層。

6.png

MQTT協議的訊息結構和 RocketMQ 的訊息結構互相獨立,因此如果將MQTT協議的訊息持久化到 RocketMQ 中,必然需要做一定的匹配。MQTT Topic有多級,如圖中T1/T2/T3所示,為多級樹形結構。將 T1 看作一級 Topic,對應 RocketMQ 中的 Topic T1,則所有發往以 T1 開頭的 MQTT Topic的訊息都會持久化到 RocketMQ 的 T1 Topic中。

此時問題演變成如何區分一條訊息屬於哪個MQTT Topic,我們選擇將MQTT Topic設為訊息的 tag,MQTT訊息中的一些可變 header 直接放在RocketMQ 訊息屬性 KV 中,訊息體可以直接對映到 RocketMQ訊息的 Payload 中,這樣完成了MQTT訊息到RocketMQ訊息的對映。

7.png

除訊息資料之外,元資料是 MQTT 服務非常重要的一部分。MQTT Bridge 中儲存了兩類元資料,分別是客戶端元資料和訂閱元資料。客戶端元資料儲存了客戶端的連線資訊、連線時間、客戶端 ID、Netty channel 等資訊,我們實現了視覺化的控制檯,支援查詢MQTT服務的連線數,支援通過連線 ID 和客戶端 ID 查詢客戶端的資訊。此外,實現了客戶端上下線通知,使用者可以通過訂閱 MQTT  Topic實時獲取到某個客戶端的上線和下線事件。訂閱元資料儲存了客戶端和MQTT的對映關係,主要通過Trie樹來儲存訂閱關係,可以滿足萬用字元的方式訂閱,實現快速匹配。Bridge 通過訂閱 Topic找到客戶端,將訊息定向推送。

8.png

MQTT協議主要有三個服務質量等級 QoS 0、 QoS 1 和 QoS 2。QoS 0表示訊息最多發一次,可能存在丟失訊息的情況,效能最好,對於資料可靠性要求不高的業務較為實用。QoS 1 為訊息保證能至少到達一次,可能會重複,效能相對差一些。QoS 2 為訊息不丟不重,但效能最差。

9.png

上圖為QoS0的實現流程。QoS 指傳送端和接收端之間的訊息傳輸質量。釋出訊息時,MQTT Bridge 作為訊息的接收端,IoT 裝置作為釋出端。訂閱訊息時,MQTT Bridge作為釋出端,IoT裝置作為接收端。釋出和訂閱是兩個獨立的 QoS 過程,整條鏈路的 QoS 是這兩部分 QoS 的最低值,比如釋出是 QoS 1,訂閱是 QoS 0,則整條鏈路的 QoS 等級就是 0。左側是 QoS 0 釋出的過程。釋出端IoT將訊息推送給MQTT Bridge,Bridge 將訊息非同步推送到 RocketMQ,無需等待響應。圖中兩個箭頭的請求都可能失敗,可能會丟資料,可靠性很低。但由於鏈路短,因此效能較高。

10.png

上圖為 QoS 1的實現流程。IoT 終端釋出訊息之前,會先將其持久化到本地記憶體裡,Bridge 收到訊息之後,將訊息非同步推送到 RocketMQ,等待訊息持久化成功的結果後,再返回pubAck包給IoT,IoT 將記憶體裡的這條訊息刪除。傳送的請求可能會失敗,傳送端記憶體裡儲存了訊息,因此可以通過重試來實現訊息至少被髮一次,但也導致訊息可能會重複傳送。訂閱端同理。

11.png

QoS 2 的實現流程如上圖。在QoS 1時, Bridge接受到訊息後沒有將訊息持久化在自己的記憶體裡,而是直接將訊息推送到RocketMQ中。如果傳送端一直沒有收到 pubAck 包,則執行重發,重發之後 Bridge無法獲知收到的訊息是新訊息還是重發訊息,會造成訊息重複。QoS 2基於 messageID 來實現訊息去重。MQTT 協議要求 message ID 可以被重複使用,且有一定範圍,不會一直遞增。所以在利用 messageID 去重的同時,還要保證 messageID 在傳輸過程中不能有重複,用完後必須釋放。

依據這兩點前提,sender在傳送訊息之前,會將訊息持久化在自己的記憶體裡,再推送給 receiver。receiver 收到訊息之後也會放在本地記憶體裡,返回 PubRec 包給 sender,通知其已經收到訊息。如果 sender 一直沒有收到PubRec包,會不斷地重複傳送訊息。由於receiver 記憶體裡已經儲存了訊息,因此可以通過 messageID 來實現訊息的去重。傳送端在接收到 PubRec 包後釋出PubRel包,通知 receiver 可以清理記憶體中的訊息,也意味著sender已經知道訊息已被 receiver 持久化,此時再由 receiver 將訊息推給RocketMQ 並等待持久化響應。receiver 傳送 PubComp 包給 sender通知其可將PubRel包刪除。上圖中步驟 3.3可能失敗,因此sender必須在記憶體中快取PubRel包。上述流程存在兩步確認機制,第一個是保證訊息能到達 receiver ;第二個是保證將用過的 messageID 釋放掉,能夠實現 message ID 的重用。

12.png

推拉模型是 MQTT Bridge 實現訊息釋出訂閱的核心模型。假設以下場景:有四個訂閱端,其中訂閱端IoT-1和IoT-2分別訂閱了 Topic1/a、Topic1/b,IoT-3和IoT-4分別訂閱了Topic2/ a。第一、二臺裝置連線到第一個 Bridge,第三、四臺裝置連線到第二個 Bridge。當有新的訂閱關係過來時,會檢查訂閱一級 Topic。上圖中Bridge1 維護的兩個訂閱關係分別是Topic1/a、Topic1/b,它會啟動 RocketMQ的消費任務,從RocketMQ中消費 Topic1 中的資料。消費到的每條訊息通過tag判斷屬於哪個 MQTT Topic,再通過匹配樹將訊息推送給客戶端。每一個 RocketMQ Topic對應一個拉取訊息的任務,而一級 Topic下面可能有很多MQTT Topic,一旦MQTT Topic增多,推送給客戶端的延時就會變高。此外,一級 Topic下可能會存在很多終端,存在大量沒有被訂閱的無用訊息。

Topic級別的任務無法為每個客戶端都維護獨立的 offset 進度。只要 Bridge 接收到客戶端訂閱的請求就會開啟消費執行緒,Topic沒有訂閱時再將執行緒停掉。這樣存在的問題是如果長時間沒有訊息釋出,但訂閱關係一直存在,會導致執行緒空轉,存在很大的資源浪費。

13.png

社群在今年 3 月份開源新版MQTT架構,架構中引入了 notify 元件。作用為通知所有MQTT Bridge 一級Topic中是否有新的訊息產生。每一個 Bridge 中都內建 notify 元件,負責啟動針對 RocketMQ一級 Topic的叢集模式消費者,一旦一級 Topic中有訊息產生時,notify 元件能夠感知到訊息的產生,同時將訊息作為事件廣播給所有Bridge。其他 Bridge 收到訊息事件的通知後,會為連線在這臺 Bridge 上的每個終端開啟獨立拉取任務。拉取時不是拉取一級 Topic中的所有資料,而是通過消費 4.9.3 版本新引入的 LMQ,避免拉取一級 Topic中其他沒有被當前客戶端訂閱的訊息,以此避免了讀放大。另外,每個終端獨立的拉取任務可以為每個終端維護獨立的 offset 進度,方便實現離線訊息。

因此,只有新的訊息事件到來時,才會為終端開啟拉取任務。Topic沒有訊息或沒有任何訂閱關係之後,拉取任務將停止。升級後的推拉模型能夠支援離線訊息,大幅降低了延時,合理的啟停機制有效避免執行緒資源的浪費。

14.png

15.png

共享訂閱是MQTT5.0 協議新增的訂閱模式,可以理解為類似RocketMQ中的叢集模式消費。上圖左側為簡單的共享佇列例項。IoT 傳送幾條訊息到 Topic1/a 中,Topic1/a有三個訂閱端,每一條訊息只會被推送給其中一個訂閱端,比如IoT-sub-1會收到message1和message4,IoT-sub-1 會收到 message2 和message5,message 會收到message 3和message 6。其實現原理為:

每個 MQTT Bridge 會啟動一個針對Topic的拉取任務。RocketMQ 本身能夠支援叢集模式,MQTT Bridge又作為RocketMQ的客戶端,因此可以複用RocketMQ的共享訂閱模型。訂閱端訂閱時以某種特殊方式帶上消費者組名稱,連線到某臺 Bridge 後,該Bridge上就會用消費者組和訂閱的一級 Topic來啟動一個RocketMQ的叢集模式消費者。第二個訂閱端連線了第二臺 Bridge,該Bridge也會啟動一個消費者。只要Bridge 上有終端連線且他們處於一組內並訂閱了同一個 RocketMQ的一級 Topic,則所有符合要求的 Bridge 會組成叢集模式的消費者叢集。有新的訊息到達 Topic1 之後,只會被其中一個 Bridge 消費,那麼也只會被連線到該 Bridge 上的 IoT 訂閱端消費到。如果有多個訂閱端同時連到一個 Bridge 上,訊息應該推給哪個客戶端呢?我們在MQTT Bridge 內建多種策略,預設選擇輪詢策略。一條訊息發到 Bridge 後,Bridge可以輪詢傳送給任意一個IoT訂閱端,實現單 Bridge 多訂閱端的共享消費。

未來工作

16.png

未來,小米MQTT的工作將從以下四個方面繼續深入探索:

  • 架構:推拉模型繼續升級完善;

  • 功能:離線訊息、保留訊息、遺囑訊息等功能的完善;

  • 社群:擁抱開源社群,跟隨社群升級RocketMQ端雲一體化的架構;

  • 業務:小米汽車等IoT的場景推廣和應用。

加入 Apache RocketMQ 社群

十年鑄劍,Apache RocketMQ 的成長離不開全球接近 500 位開發者的積極參與貢獻,相信在下個版本你就是 Apache RocketMQ 的貢獻者,在社群不僅可以結識社群大牛,提升技術水平,也可以提升個人影響力,促進自身成長。

社群 5.0 版本正在進行著如火如荼的開發,另外還有接近 30 個 SIG(興趣小組)等你加入,歡迎立志打造世界級分散式系統的同學加入社群,新增社群開發者微信:rocketmq666 即可進群,參與貢獻,打造下一代訊息、事件、流融合處理平臺。

17.jpeg

微信掃碼新增小火箭進群

另外還可以加入釘釘群與 RocketMQ 愛好者一起廣泛討論:

18.png

釘釘掃碼加群

關注【Apache RocketMQ】公眾號,獲取更多技術乾貨!