物聯網微訊息佇列MQTT介紹-EMQX叢集搭建以及與SpringBoot整合

語言: CN / TW / HK

先看我們最後實現的一個效果

1.手機端向主題 topic111 傳送訊息,並接收。(手機測試工具名稱:MQTT偵錯程式)

2.控制檯列印

MQTT基本簡介

MQTT 是用於物聯網 (IoT) 的 OASIS 標準訊息傳遞協議。它被設計為一種極其輕量級的釋出/訂閱訊息傳輸,非常適合連線具有小程式碼足跡和最小網路頻寬的遠端裝置。

MQTT協議簡介

MQTT 是客戶端伺服器釋出/訂閱訊息傳輸協議。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通訊,其中需要小程式碼足跡和/或網路頻寬非常寶貴。

該協議通過 TCP/IP 或其他提供有序、無損、雙向連線的網路協議執行。其特點包括:

·         使用釋出/訂閱訊息模式,提供一對多的訊息分發和應用程式的解耦。

·         與有效負載內容無關的訊息傳輸。

·         訊息傳遞的三種服務質量:

o    “最多一次”,根據操作環境的最大努力傳遞訊息。可能會發生訊息丟失。例如,此級別可用於環境感測器資料,其中單個讀數是否丟失並不重要,因為下一個讀數將很快釋出。

o    “至少一次”,保證訊息到達但可能出現重複。

o    “Exactly once”,保證訊息只到達一次。例如,此級別可用於重複或丟失訊息可能導致應用不正確費用的計費系統。

·         最小化傳輸開銷和協議交換以減少網路流量。

·         發生異常斷開時通知相關方的機制。

EMQX簡介

通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連線任何裝置。使用 EMQX Enterprise 叢集輕鬆擴充套件到數千萬併發 MQTT 連線。

並且EMQX還是開源的,又支援叢集,所以還是一個比較不錯的選擇

EMQX叢集搭建

前期準備:

1.兩臺伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿里雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和

mqtt_service_txyun 吧。

2.一個域名: mqtt.zhouhong.icu

安裝開始

1.分別在兩臺伺服器上執行以下操作進行安裝(如果是單機:只需要進行下面1、2操作就安裝完成了)

## 1.下載
wget http://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 2.安裝
sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 3.修改配置檔案
vim /etc/emqx/emqx.conf
## 4.修改以下內容
## 注意node.name是當前這臺伺服器名稱
node.name = [email protected]
cluster.static.seeds = [email protected],[email protected]
cluster.discovery = static
cluster.name = my-mqtt-cluster

2.分別啟動兩臺伺服器的EMQX

sudo emqx start

3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 檢視(隨便一臺都可以,預設賬號admin 密碼public),注意開啟18083,1883 安全組

4.nginx負載均衡

nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可

stream {
  upstream mqtt.zhouhong.icu {
      zone tcp_servers 64k;
      hash $remote_addr;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;

  }

  server {
      listen 8883 ssl;
      status_zone tcp_server;
      proxy_pass mqtt.zhouhong.icu;
      proxy_buffer_size 4k;
      ssl_handshake_timeout 15s;
      ssl_certificate     /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem;
      ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key;
  }
}

與SpringBoot整合並實現伺服器端監控對應topic下的訊息

1.專案搭建

  • 引入MQTT相關jar包

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  • yml配置檔案 (如果大家沒搭建好的話,可以直接使用我搭建的這個)

server:
  port: 8080

mqtt:
 ## 單機版--只需要把域名改為ip既可 
  hostUrl: tcp://mqtt.zhouhong.icu:1883
  username: admin
  password: public
  ## 服務端 clientId (傳送端自己定義)
  clientId: service_client_id
  cleanSession: true
  reconnect: true
  timeout: 100
  keepAlive: 100
  defaultTopic: topic111
  qos: 0
  • 屬性配置

/**
 * description:
 * date: 2022/6/16 15:51
 * @author: zhouhong
 */
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 使用者名稱
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * 連線地址
     */
    private String hostUrl;

    /**
     * 客戶端Id,同一臺伺服器下,不允許出現重複的客戶端id
     */
    private String clientId;

    /**
     * 預設連線主題
     */
    private String topic;

    /**
     * 超時時間
     */
    private int timeout;

    /**
     * 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端
     * 傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制
     */
    private int keepAlive;

    /**
     * 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連
     * 接記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線
     */
    private Boolean cleanSession;

    /**
     * 是否斷線重連
     */
    private Boolean reconnect;

    /**
     * 連線方式
     */
    private Integer qos;
}
  • 傳送訊息回撥

/**
 * description: 發生訊息成功後 的 回撥
 * date: 2022/6/16 15:55
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttSendCallBack implements MqttCallbackExtended {

    /**
     * 客戶端斷開後觸發
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("傳送訊息回撥: 連線斷開,可以做重連");
    }

    /**
     * 客戶端收到訊息觸發
     *
     * @param topic       主題
     * @param mqttMessage 訊息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("傳送訊息回撥:  接收訊息主題 : " + topic);
        log.info("傳送訊息回撥:  接收訊息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 釋出訊息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("傳送訊息回撥:  向主題:" + topic + "傳送訊息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("傳送訊息回撥:  訊息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連線emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連線成功!--------------------");
    }
}
  • 接收訊息回撥

/**
 * description: 接收訊息後的回撥
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptCallback implements MqttCallbackExtended {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 客戶端斷開後觸發
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("接收訊息回撥:  連線斷開,可以做重連");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            log.info("接收訊息回撥:  emqx重新連線....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客戶端收到訊息觸發
     *
     * @param topic       主題
     * @param mqttMessage 訊息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收訊息回撥:  接收訊息主題 : " + topic);
        log.info("接收訊息回撥:  接收訊息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 釋出訊息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("接收訊息回撥:  向主題:" + topic + "傳送訊息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("接收訊息回撥:  訊息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連線emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連線成功!--------------------");
        // 以/#結尾表示訂閱所有以test開頭的主題
        // 訂閱所有機構主題
        mqttAcceptClient.subscribe("topic111", 0);
    }
}
  • 發訊息

/**
 * description: 傳送訊息
 * date: 2022/6/16 16:01
 *
 * @author: zhouhong
 */
@Component
public class MqttSendClient {

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-","");
            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            try {
                // 設定回撥
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 釋出訊息
     * 主題格式: server:report:$orgCode(引數實際使用機構程式碼)
     *
     * @param retained    是否保留
     * @param pushMessage 訊息體
     */
    public void publish(boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 關閉連線
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.close();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 接收訊息

/**
 * description: 伺服器段端連線訂閱訊息、監控topic
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptClient {

    @Autowired
    @Lazy
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客戶端連線
     */
    public void connect() {
        MqttClient client;
        try {
            // clientId 使用伺服器 yml裡面配置的 clientId
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            try {
                // 設定回撥
                client.setCallback(mqttAcceptCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新連線
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題
     *
     * @param topic 主題
     * @param qos   連線方式
     */
    public void subscribe(String topic, int qos) {
        log.info("==============開始訂閱主題==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消訂閱某個主題
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        log.info("==============開始取消訂閱主題==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 服務端啟動時連線訂閱主題並監控

/**
 * description: 啟動後連線 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic傳送的訊息
 * date: 2022/6/16 15:57
 * @author: zhouhong
 */
@Configuration
public class MqttConfig {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}
  • 發訊息控制類

/**
 * description: 發訊息控制類
 * date: 2022/6/16 15:58
 *
 * @author: zhouhong
 */
@RestController
public class SendController {

    @Resource
    private MqttSendClient mqttSendClient;

    @PostMapping("/mqtt/sendmessage")
    public void sendMessage(@RequestBody SendParam sendParam) {
        mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent());
    }
}

2.測試

  • postman呼叫發訊息介面

  •  控制檯日誌

  •  使用另外一個移動端MQTT除錯工具測試
  1. 手機端向主題 topic111 傳送訊息,並接收。

2. 控制檯列印