物聯網微訊息佇列MQTT介紹-EMQX叢集搭建以及與SpringBoot整合
先看我們最後實現的一個效果
1.手機端向主題 topic111 傳送訊息,並接收。(手機測試工具名稱:MQTT偵錯程式)
2.控制檯列印
MQTT基本簡介
MQTT 是用於物聯網 (IoT) 的 OASIS 標準訊息傳遞協議。它被設計為一種極其輕量級的釋出/訂閱訊息傳輸,非常適合連線具有小程式碼足跡和最小網路頻寬的遠端裝置。
MQTT協議簡介
MQTT 是客戶端伺服器釋出/訂閱訊息傳輸協議。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通訊,其中需要小程式碼足跡和/或網路頻寬非常寶貴。
該協議通過 TCP/IP 或其他提供有序、無損、雙向連線的網路協議執行。其特點包括:
· 使用釋出/訂閱訊息模式,提供一對多的訊息分發和應用程式的解耦。
· 與有效負載內容無關的訊息傳輸。
· 訊息傳遞的三種服務質量:
o “最多一次”,根據操作環境的最大努力傳遞訊息。可能會發生訊息丟失。例如,此級別可用於環境感測器資料,其中單個讀數是否丟失並不重要,因為下一個讀數將很快釋出。
o “至少一次”,保證訊息到達但可能出現重複。
o “Exactly once”,保證訊息只到達一次。例如,此級別可用於重複或丟失訊息可能導致應用不正確費用的計費系統。
· 最小化傳輸開銷和協議交換以減少網路流量。
· 發生異常斷開時通知相關方的機制。
EMQX簡介
通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連線任何裝置。使用 EMQX Enterprise 叢集輕鬆擴充套件到數千萬併發 MQTT 連線。
並且EMQX還是開源的,又支援叢集,所以還是一個比較不錯的選擇
EMQX叢集搭建
前期準備:
1.兩臺伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿里雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和
mqtt_service_txyun 吧。
2.一個域名: mqtt.zhouhong.icu
安裝開始
1.分別在兩臺伺服器上執行以下操作進行安裝(如果是單機:只需要進行下面1、2操作就安裝完成了)
## 1.下載 wget http://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 2.安裝 sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 3.修改配置檔案 vim /etc/emqx/emqx.conf ## 4.修改以下內容 ## 注意node.name是當前這臺伺服器名稱 node.name = [email protected] cluster.static.seeds = [email protected],[email protected] cluster.discovery = static cluster.name = my-mqtt-cluster
2.分別啟動兩臺伺服器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 檢視(隨便一臺都可以,預設賬號admin 密碼public),注意開啟18083,1883 安全組
4.nginx負載均衡
nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可
stream { upstream mqtt.zhouhong.icu { zone tcp_servers 64k; hash $remote_addr; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; } server { listen 8883 ssl; status_zone tcp_server; proxy_pass mqtt.zhouhong.icu; proxy_buffer_size 4k; ssl_handshake_timeout 15s; ssl_certificate /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem; ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key; } }
與SpringBoot整合並實現伺服器端監控對應topic下的訊息
1.專案搭建
-
引入MQTT相關jar包
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
-
yml配置檔案 (如果大家沒搭建好的話,可以直接使用我搭建的這個)
server: port: 8080 mqtt: ## 單機版--只需要把域名改為ip既可 hostUrl: tcp://mqtt.zhouhong.icu:1883 username: admin password: public ## 服務端 clientId (傳送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
-
屬性配置
/** * description: * date: 2022/6/16 15:51 * @author: zhouhong */ @Component @ConfigurationProperties("mqtt") @Data public class MqttProperties { /** * 使用者名稱 */ private String username; /** * 密碼 */ private String password; /** * 連線地址 */ private String hostUrl; /** * 客戶端Id,同一臺伺服器下,不允許出現重複的客戶端id */ private String clientId; /** * 預設連線主題 */ private String topic; /** * 超時時間 */ private int timeout; /** * 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端 * 傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 */ private int keepAlive; /** * 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連 * 接記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 */ private Boolean cleanSession; /** * 是否斷線重連 */ private Boolean reconnect; /** * 連線方式 */ private Integer qos; }
-
傳送訊息回撥
/** * description: 發生訊息成功後 的 回撥 * date: 2022/6/16 15:55 * * @author: zhouhong */ @Component @Log4j2 public class MqttSendCallBack implements MqttCallbackExtended { /** * 客戶端斷開後觸發 * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("傳送訊息回撥: 連線斷開,可以做重連"); } /** * 客戶端收到訊息觸發 * * @param topic 主題 * @param mqttMessage 訊息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("傳送訊息回撥: 接收訊息主題 : " + topic); log.info("傳送訊息回撥: 接收訊息內容 : " + new String(mqttMessage.getPayload())); } /** * 釋出訊息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("傳送訊息回撥: 向主題:" + topic + "傳送訊息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("傳送訊息回撥: 訊息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連線emq伺服器後觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連線成功!--------------------"); } }
-
接收訊息回撥
/** * description: 接收訊息後的回撥 * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptCallback implements MqttCallbackExtended { @Resource private MqttAcceptClient mqttAcceptClient; /** * 客戶端斷開後觸發 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("接收訊息回撥: 連線斷開,可以做重連"); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { log.info("接收訊息回撥: emqx重新連線...................................................."); mqttAcceptClient.reconnection(); } } /** * 客戶端收到訊息觸發 * * @param topic 主題 * @param mqttMessage 訊息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收訊息回撥: 接收訊息主題 : " + topic); log.info("接收訊息回撥: 接收訊息內容 : " + new String(mqttMessage.getPayload())); } /** * 釋出訊息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("接收訊息回撥: 向主題:" + topic + "傳送訊息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("接收訊息回撥: 訊息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連線emq伺服器後觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連線成功!--------------------"); // 以/#結尾表示訂閱所有以test開頭的主題 // 訂閱所有機構主題 mqttAcceptClient.subscribe("topic111", 0); } }
-
發訊息
/** * description: 傳送訊息 * date: 2022/6/16 16:01 * * @author: zhouhong */ @Component public class MqttSendClient { @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect() { MqttClient client = null; try { String uuid = UUID.randomUUID().toString().replaceAll("-",""); client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true); options.setAutomaticReconnect(false); try { // 設定回撥 client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } return client; } /** * 釋出訊息 * 主題格式: server:report:$orgCode(引數實際使用機構程式碼) * * @param retained 是否保留 * @param pushMessage 訊息體 */ public void publish(boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttClient mqttClient = connect(); try { mqttClient.publish(topic, message); } catch (MqttException e) { e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } /** * 關閉連線 * * @param mqttClient */ public static void disconnect(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.disconnect(); } } catch (MqttException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param mqttClient */ public static void close(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.close(); } } catch (MqttException e) { e.printStackTrace(); } } }
-
接收訊息
/** * description: 伺服器段端連線訂閱訊息、監控topic * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptClient { @Autowired @Lazy private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 客戶端連線 */ public void connect() { MqttClient client; try { // clientId 使用伺服器 yml裡面配置的 clientId client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); try { // 設定回撥 client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 重新連線 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題 * * @param topic 主題 * @param qos 連線方式 */ public void subscribe(String topic, int qos) { log.info("==============開始訂閱主題==============" + topic); try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消訂閱某個主題 * * @param topic */ public void unsubscribe(String topic) { log.info("==============開始取消訂閱主題==============" + topic); try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } }
-
服務端啟動時連線訂閱主題並監控
/** * description: 啟動後連線 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic傳送的訊息 * date: 2022/6/16 15:57 * @author: zhouhong */ @Configuration public class MqttConfig { @Resource private MqttAcceptClient mqttAcceptClient; @Bean public MqttAcceptClient getMqttPushClient() { mqttAcceptClient.connect(); return mqttAcceptClient; } }
-
發訊息控制類
/** * description: 發訊息控制類 * date: 2022/6/16 15:58 * * @author: zhouhong */ @RestController public class SendController { @Resource private MqttSendClient mqttSendClient; @PostMapping("/mqtt/sendmessage") public void sendMessage(@RequestBody SendParam sendParam) { mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent()); } }
2.測試
- postman呼叫發訊息介面
- 控制檯日誌
- 使用另外一個移動端MQTT除錯工具測試
- 手機端向主題 topic111 傳送訊息,並接收。
2. 控制檯列印
- EMAS Serverless到底有多便利?
- 時隔4個月我面試位元組又掛了|總結與展望
- EMAS Serverless系列~4步教你快速搭建小程式
- uni-app 從0 到 1 製作一個專案,收藏等於學會
- 聊聊客戶檔案模型的設計與管理
- Java NIO全面詳解(看這篇就夠了)
- 面試突擊74:properties和yml有什麼區別?
- 長篇圖解java反射機制及其應用場景
- 記一次 ClickHouse 效能測試
- Linux—磁碟管理
- luoguP3224 [HNOI2012]永無鄉【線段樹,並查集】
- 麻了,程式碼改成多執行緒,竟有9大問題
- 急如閃電快如風,彩虹女神躍長空,Go語言高效能Web框架Iris專案實戰-初始化專案ep00
- HC32L110 系列 M0 MCU 的介紹和Win10下DAP-Link, ST-Link, J-Link的燒錄
- 使用 Golang 程式碼生成圖表的開源庫對比
- MapReduce入門實戰
- selenium基本用法
- 紅黑樹以及JAVA實現(一)
- Docker常用命令
- 資料結構與演算法【Java】03---棧