「Android基於MQTT實現訊息通知」

語言: CN / TW / HK

theme: fancy highlight: an-old-hope


「Android基於MQTT實現訊息通知」

一、寫在前面

在對接專案中IoT時,發現目前有對MQTT做了接入,這裡記錄一下,官方的資料比較詳細,這裡主要從實現細節出發;對具體的需求以及配套的技術方案進行整理,以供參考。

一、IoT與MQTT

提到 IoT(Internet of Things)、IIoT(Industrial IoT ) 不得不說 MQTT,其被廣泛的應用在物聯網以及工業物聯網之中,是一種訊息傳遞協議。不同於我們所認識的平時常見的一些智慧裝置,如手機、電腦、平板等;這些裝置一般都有著很好的計算能力,所依賴的網路環境很優質。但是一般的硬體裝置效能較差,網路環境不穩定,而MQTT則是專門針對於硬體效能,網路狀態不穩定場景下而生的。有著天然的優勢。

二、什麼是MQTT

MQTT是用於物聯網的最常用的訊息傳遞協議 (IoT)。MQTT代表MQ遙測運輸。該協議是一組規則,它定義了IoT裝置如何通過Internet釋出訂閱資料。用於IoT和工業IoT(IIT)裝置(例如嵌入式裝置,感測器,工業PLC等)之間的訊息傳遞和資料交換。協議是事件驅動的,並使用釋出/訂閱(PUB / SUB)模式連線裝置。釋出者和接收器(訂閱者)通過主題通訊,並彼此分離。它們之間的連線由MQTT代理處理。MQTT代理過濾所有傳入訊息並將其正確分發給訂閱者。

三、與傳統Http的區別
  • MQTT以資料為中心,底層基於TCP連結,直接操作輕量級的二進位制資料,並且資料包很小(可以小到一個字元,兩個位元組)。劃重點,由於這個特性,其對於網路環境狀態要求沒有HTTP那麼高,這也是為什麼廣泛應用於IoT裝置的原因之一。
  • MQTT基於釋出/訂閱模式,區別於HTTP的請求/回撥模式,這就決定了一個同一個裝置即可以是客戶端(Client)同時也可以是服務端(Server),回想釋出訂閱模式,訊息的釋出可以是1toN(N>=0),而HTTP則是1to1。
  • MQTT的釋出/訂閱架構決定了其無法基於UDP(面向無連結),而HTPP底層可以是基於TCP或者UDP。
  • 訊息體量的區別,MQTT資料包很小,而HTTP資料量一般較大。
四、MQTT構成部分
1.Publish&Subscribe

MQTT對於釋出訂閱做了自身的解耦處理,主要是從三個維度出發,1.空間解耦:釋出者和訂閱者不需要相互瞭解(例如,沒有IP地址和埠的交換)。2.時間解耦:釋出者和訂閱者不需要同時執行。3.同步解耦:在釋出或接收期間,兩個元件的操作不需要中斷。詳細資訊

2.Client、Broker

詳細資訊

3.Topics&Best Practices

主要需要注意Topics的匹配規則,分為單項萬用字元,與多項萬用字元。單項以 + 連線:this/is/+/single,其中僅僅 + 部分可以被替換為單個路徑(以 / 分割)。多項萬用字元僅支援在尾端支援:this/is/multi/#,並且是多級的。

詳細資訊

4.Keep Alive

保活時效,包括其他的欄位,官方文件都給出了很詳細的解釋,認真瞭解一項技術實現,官方的文件還是最好的選擇文件。這裡主要基本認識MQTT是個什麼東西,具體的實現細節與規範也不是一兩句話可以說的清楚的,且可能存在誤導的風險。MQTT

五、MQTT實際專案中的使用
1.實現什麼需求?

以實際的專案為例,現需要實現的功能有:

  • 服務端下發訊息通知到IoT裝置,訊息以Type區分,不同的訊息需對應不同的處理措施。
  • 根據訊息的不同,有語音播放、叫號、本地資料更新、服務端配置下發。

功能相對很簡單,總結就是服務端推送訊息,裝置根據訊息做出響應。

2.具體實現方案

匯入依賴

java implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

主要分為幾個類:a.主體請求Client,b.資料返回的回撥dataCallback,c.連結狀態回撥connectCallback,d.具體訊息處理策略IHandler。方案主要就包括這幾個大類,逐步實現各個細節。

3.資料介面回撥IDataCallback

kotlin interface IDataCallback { fun connectionError(cause: Throwable?) fun dataMessage(dataMessage: String) }

4.連線狀態回撥IConnectionCallback

kotlin interface IConnectionCallback { fun connectSuccess() fun connectFail(reason: Throwable?) }

5.CMqttClient連結類

在實現之前,列舉幾個關鍵的引數,引數配置在MqttConnectOptions

kotlin val options = MqttConnectOptions() //預設為true,表示非持久訂閱,無論服務端或者客戶端重啟,不會保持狀態,重啟後指定訊息也無法送達 //設定為false,表示持久訂閱,服務端與客戶端重啟或重鏈,指定訊息可以送達 options.isCleanSession = false //連結的使用者名稱(賬號名) options.userName = user //連結的使用者密碼(賬戶密碼) options.password = password.toCharArray() //連結超時值s,預設30s,為0時,等待網路狀態,即成功或失敗 options.connectionTimeout = connectTimeout //預設60s,檢測服務端是否可用,為0時則禁止客戶端保活,保活間隔內,沒有訊息的情況下客戶端會通過ping來檢測連結是否保持 options.keepAliveInterval = keepAliveInterval //是否開啟自動重連線,初始嘗試重連是等待1s,失敗情況下,延遲加倍,直到2分鐘。 options.isAutomaticReconnect = true

關於自動重新連線有三個必要條件,cleanSession需要設定為falseisAutomaticReconnect需要設定為true,並且初始已經連線過。劃重點,這裡就要求,MQTT雖然可以自動重試連線當時必須有這三個前提,那麼首次由於網路等其他原因未能連線的,這層的重試機制是需要我們自身去實現的,也就是需要保證首次能夠連線到服務端。原始碼以及註釋:

java //Reconnect Only appropriate if cleanSession is false and we were connected. Declare as synchronized to avoid multiple calls to this method to send connect multiple times synchronized void reconnect() { //.... } //註釋說的很明確三個條件,1.cleanSession需設定為false,2.isAutomaticReconnect需設定為true,並且是之前是已經連線過了。

kotlin class CMqttClient { //正在連線伺服器 private var connecting = false private var mqttAndroidClient: MqttAndroidClient? = null //連線到伺服器 fun connectToServer( context: Context, host: String, clientId: String, accountName: String, accountPsw: String, connectTimeout: Int = 20, keepAliveTime: Int = 60, connectionCallback: IConnectionCallback? = null, dataCallback: IDataCallback? = null ) { if(null == mqttAndroidClient) { mqttAndroidClient = MqttAndroidClient(context, host, clientId) mqttAndroidClient?.setCallback(object: MqttCallback { override fun connectionLost(cause: Throwable?) { dataCallback?.connectionError(cause) } override fun messageArrived(topic: String?, message: MqttMessage?) { message?.let { val payLoad = String(it.payload) Log.d(xxxxxxxxx) dataCallback?.dataMessage(payLoad) } } override fun deliveryComplete(token: IMqttDeliveryToken?) { //do something } }) } connecting = true val options = MqttConnectOptions() options.isCleanSession = false options.userName = user options.password = password.toCharArray() options.connectionTimeout = connectTimeout options.keepAliveInterval = keepAliveInterval options.isAutomaticReconnect = true mqttAndroidClient?.connect(options, null, object: IMqttActionListener{ override fun onSuccess(asyncActionToken: IMqttToken?) { connecting = false connectionCallback?.connectSuccess() } override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { connecting = false connectionCallback?.connectFail(exception) } }) } //是否已經建立連結 fun hasConnected(): Boolean { return try { mqttAndroidClient?.isConnected == true } catch { false } } //斷開與服務端連線 fun disConnectFromServer() { if(mqttAndroidClient?.isConnected == true) { mqttAndroidClient?.disconnect() } connecting = false } //釋放資源 fun relaseClient() { mqttAndroidClient?.close() mqttAndroidClient = null connecting = false } //訂閱訊息 fun subscribe(topics: Array<String>, qos: IntArray, timeOut: Long = 2000): Boolean { return mqttAndroidClient?.let { try{ val mqttToken = it.subscribe(topics, qos) mqttToken.waitForCompletion(timeout) mqttToken.isComplete true } catch { Log.d(xxxx) false } }?: false } //取消訂閱 fun unSubscribe(): Boolean { //省略 } }

需要注意的是這裡的ClientId,是唯一性的,像IoT裝置以裝置deviceId作為ClientId,如果換成使用者userId,當在多裝置登入的情況下,那麼重試等其他一些機制會影響預期結果,給排查問題帶來一定的難度。

5.訊息處理介面IHandler

kotlin interface IHandler { fun handlerMessage(message: String) }

訊息體中會包含不同的type,根據不同的type實現不同的處理器,當然為了靈活還要藉助註解機制

6.註解模版

kotlin @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.SOURCE) annotation class MQTTHandler { val groupName: String val type: String }

通過反射的方式載入對應的IHandler實現類,核心程式碼

```kotlin interface IHandlerProvide { fun provideHandler(handlerType: String): IHandler? fun release() }

class XXXHandlerProvider: IHandlerProvide { override fun provideHandler(handlerType: String): IHandler? { //find the imp IHandler } override fun release() { //release the source } }

override fun provideHandler(handlerType: String): IHandler? { val handlerClass = Maneger.instance.findIHandlersByGroup(GroupName)?.get(handlerType) } ```

使用時,直接加上註解:

kotlin @MQTTHandler(groupName = groupxxxx, type = typeNamexxxx) class TestHandler: IHandler { override fun handlerMessage(message: String) { //do something what you want } }

整個流程的主要部分已經給出,核心是通過不同的訊息type查找出對應的處理器;當然這部分主要是由註解完成的,對於處理器的查詢則是通過反射的方式來進行匹配的。

六、文件

MQTT官網