「Android基於MQTT實現訊息通知」
theme: fancy highlight: an-old-hope
「Android基於MQTT實現訊息通知」
一、寫在前面
在對接專案中IoT時,發現目前有對MQTT做了接入,這裡記錄一下,官方的資料比較詳細,這裡主要從實現細節出發;對具體的需求以及配套的技術方案進行整理,以供參考。
一、IoT與MQTT
提到 IoT(Internet of Things)、IIoT(Industrial IoT ) 不得不說 MQTT,其被廣泛的應用在物聯網以及工業物聯網之中,是一種訊息傳遞協議。不同於我們所認識的平時常見的一些智慧裝置,如手機、電腦、平板等;這些裝置一般都有著很好的計算能力,所依賴的網路環境很優質。但是一般的硬體裝置效能較差,網路環境不穩定,而MQTT則是專門針對於硬體效能,網路狀態不穩定場景下而生的。有著天然的優勢。
二、什麼是MQTT
MQTT是用於物聯網的最常用的訊息傳遞協議 (IoT)。MQTT代表MQ遙測運輸。該協議是一組規則,它定義了IoT裝置如何通過Internet釋出和訂閱資料。用於IoT和工業IoT(IIT)裝置(例如嵌入式裝置,感測器,工業PLC等)之間的訊息傳遞和資料交換。協議是事件驅動的,並使用釋出/訂閱(PUB / SUB)模式連線裝置。釋出者和接收器(訂閱者)通過主題通訊,並彼此分離。它們之間的連線由MQTT代理處理。MQTT代理過濾所有傳入訊息並將其正確分發給訂閱者。
三、與傳統Http的區別
- MQTT以資料為中心,底層基於TCP連結,直接操作輕量級的二進位制資料,並且資料包很小(可以小到一個字元,兩個位元組)。劃重點,由於這個特性,其對於網路環境狀態要求沒有HTTP那麼高,這也是為什麼廣泛應用於IoT裝置的原因之一。
- MQTT基於釋出/訂閱模式,區別於HTTP的請求/回撥模式,這就決定了一個同一個裝置即可以是客戶端(Client)同時也可以是服務端(Server),回想釋出訂閱模式,訊息的釋出可以是1toN(N>=0),而HTTP則是1to1。
- MQTT的釋出/訂閱架構決定了其無法基於UDP(面向無連結),而HTPP底層可以是基於TCP或者UDP。
- 訊息體量的區別,MQTT資料包很小,而HTTP資料量一般較大。
四、MQTT構成部分
1.Publish&Subscribe
MQTT對於釋出訂閱做了自身的解耦處理,主要是從三個維度出發,1.空間解耦:釋出者和訂閱者不需要相互瞭解(例如,沒有IP地址和埠的交換)。2.時間解耦:釋出者和訂閱者不需要同時執行。3.同步解耦:在釋出或接收期間,兩個元件的操作不需要中斷。詳細資訊
2.Client、Broker
3.Topics&Best Practices
主要需要注意Topics的匹配規則,分為單項萬用字元,與多項萬用字元。單項以 + 連線:this/is/+/single
,其中僅僅 + 部分可以被替換為單個路徑(以 / 分割)。多項萬用字元僅支援在尾端支援:this/is/multi/#
,並且是多級的。
4.Keep Alive
保活時效,包括其他的欄位,官方文件都給出了很詳細的解釋,認真瞭解一項技術實現,官方的文件還是最好的選擇文件。這裡主要基本認識MQTT是個什麼東西,具體的實現細節與規範也不是一兩句話可以說的清楚的,且可能存在誤導的風險。MQTT
五、MQTT實際專案中的使用
1.實現什麼需求?
以實際的專案為例,現需要實現的功能有:
- 服務端下發訊息通知到IoT裝置,訊息以Type區分,不同的訊息需對應不同的處理措施。
- 根據訊息的不同,有語音播放、叫號、本地資料更新、服務端配置下發。
功能相對很簡單,總結就是服務端推送訊息,裝置根據訊息做出響應。
2.具體實現方案
匯入依賴
java
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
主要分為幾個類:a.主體請求Client,b.資料返回的回撥dataCallback,c.連結狀態回撥connectCallback,d.具體訊息處理策略IHandler。方案主要就包括這幾個大類,逐步實現各個細節。
3.資料介面回撥IDataCallback
kotlin
interface IDataCallback {
fun connectionError(cause: Throwable?)
fun dataMessage(dataMessage: String)
}
4.連線狀態回撥IConnectionCallback
kotlin
interface IConnectionCallback {
fun connectSuccess()
fun connectFail(reason: Throwable?)
}
5.CMqttClient連結類
在實現之前,列舉幾個關鍵的引數,引數配置在MqttConnectOptions中
kotlin
val options = MqttConnectOptions()
//預設為true,表示非持久訂閱,無論服務端或者客戶端重啟,不會保持狀態,重啟後指定訊息也無法送達
//設定為false,表示持久訂閱,服務端與客戶端重啟或重鏈,指定訊息可以送達
options.isCleanSession = false
//連結的使用者名稱(賬號名)
options.userName = user
//連結的使用者密碼(賬戶密碼)
options.password = password.toCharArray()
//連結超時值s,預設30s,為0時,等待網路狀態,即成功或失敗
options.connectionTimeout = connectTimeout
//預設60s,檢測服務端是否可用,為0時則禁止客戶端保活,保活間隔內,沒有訊息的情況下客戶端會通過ping來檢測連結是否保持
options.keepAliveInterval = keepAliveInterval
//是否開啟自動重連線,初始嘗試重連是等待1s,失敗情況下,延遲加倍,直到2分鐘。
options.isAutomaticReconnect = true
關於自動重新連線有三個必要條件,cleanSession需要設定為false,isAutomaticReconnect需要設定為true,並且初始已經連線過。劃重點,這裡就要求,MQTT雖然可以自動重試連線當時必須有這三個前提,那麼首次由於網路等其他原因未能連線的,這層的重試機制是需要我們自身去實現的,也就是需要保證首次能夠連線到服務端。原始碼以及註釋:
java
//Reconnect Only appropriate if cleanSession is false and we were connected. Declare as synchronized to avoid multiple calls to this method to send connect multiple times
synchronized void reconnect() {
//....
}
//註釋說的很明確三個條件,1.cleanSession需設定為false,2.isAutomaticReconnect需設定為true,並且是之前是已經連線過了。
kotlin
class CMqttClient {
//正在連線伺服器
private var connecting = false
private var mqttAndroidClient: MqttAndroidClient? = null
//連線到伺服器
fun connectToServer(
context: Context,
host: String,
clientId: String,
accountName: String,
accountPsw: String,
connectTimeout: Int = 20,
keepAliveTime: Int = 60,
connectionCallback: IConnectionCallback? = null,
dataCallback: IDataCallback? = null
) {
if(null == mqttAndroidClient) {
mqttAndroidClient = MqttAndroidClient(context, host, clientId)
mqttAndroidClient?.setCallback(object: MqttCallback {
override fun connectionLost(cause: Throwable?) {
dataCallback?.connectionError(cause)
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
message?.let {
val payLoad = String(it.payload)
Log.d(xxxxxxxxx)
dataCallback?.dataMessage(payLoad)
}
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
//do something
}
})
}
connecting = true
val options = MqttConnectOptions()
options.isCleanSession = false
options.userName = user
options.password = password.toCharArray()
options.connectionTimeout = connectTimeout
options.keepAliveInterval = keepAliveInterval
options.isAutomaticReconnect = true
mqttAndroidClient?.connect(options, null, object: IMqttActionListener{
override fun onSuccess(asyncActionToken: IMqttToken?) {
connecting = false
connectionCallback?.connectSuccess()
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
connecting = false
connectionCallback?.connectFail(exception)
}
})
}
//是否已經建立連結
fun hasConnected(): Boolean {
return try {
mqttAndroidClient?.isConnected == true
} catch {
false
}
}
//斷開與服務端連線
fun disConnectFromServer() {
if(mqttAndroidClient?.isConnected == true) {
mqttAndroidClient?.disconnect()
}
connecting = false
}
//釋放資源
fun relaseClient() {
mqttAndroidClient?.close()
mqttAndroidClient = null
connecting = false
}
//訂閱訊息
fun subscribe(topics: Array<String>, qos: IntArray, timeOut: Long = 2000): Boolean {
return mqttAndroidClient?.let {
try{
val mqttToken = it.subscribe(topics, qos)
mqttToken.waitForCompletion(timeout)
mqttToken.isComplete
true
} catch {
Log.d(xxxx)
false
}
}?: false
}
//取消訂閱
fun unSubscribe(): Boolean {
//省略
}
}
需要注意的是這裡的ClientId,是唯一性的,像IoT裝置以裝置deviceId作為ClientId,如果換成使用者userId,當在多裝置登入的情況下,那麼重試等其他一些機制會影響預期結果,給排查問題帶來一定的難度。
5.訊息處理介面IHandler
kotlin
interface IHandler {
fun handlerMessage(message: String)
}
訊息體中會包含不同的type,根據不同的type實現不同的處理器,當然為了靈活還要藉助註解機制。
6.註解模版
kotlin
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.SOURCE)
annotation class MQTTHandler {
val groupName: String
val type: String
}
通過反射的方式載入對應的IHandler實現類,核心程式碼
```kotlin interface IHandlerProvide { fun provideHandler(handlerType: String): IHandler? fun release() }
class XXXHandlerProvider: IHandlerProvide { override fun provideHandler(handlerType: String): IHandler? { //find the imp IHandler } override fun release() { //release the source } }
override fun provideHandler(handlerType: String): IHandler? { val handlerClass = Maneger.instance.findIHandlersByGroup(GroupName)?.get(handlerType) } ```
使用時,直接加上註解:
kotlin
@MQTTHandler(groupName = groupxxxx, type = typeNamexxxx)
class TestHandler: IHandler {
override fun handlerMessage(message: String) {
//do something what you want
}
}
整個流程的主要部分已經給出,核心是通過不同的訊息type查找出對應的處理器;當然這部分主要是由註解完成的,對於處理器的查詢則是通過反射的方式來進行匹配的。