手把手教大家寫書寫一個Mqtt閘道器
摘要:物聯網是現在比較熱門的軟體領域,眾多物聯網廠商都有自己的物聯網平臺,而物聯網平臺其中一個核心的模組就是Mqtt閘道器。
本文分享自華為雲社群《一文帶你掌握物聯網mqtt閘道器搭建背後的技術原理》,作者:張儉。
前言
物聯網是現在比較熱門的軟體領域,眾多物聯網廠商都有自己的物聯網平臺,而物聯網平臺其中一個核心的模組就是Mqtt閘道器。這篇文章的目的是手把手教大家寫書寫一個mqtt閘道器,後端儲存支援Kafka/Pulsar,支援mqtt 連線、斷鏈、傳送訊息、訂閱訊息。技術選型:
- Netty java最流行的網路框架
- netty-codec-mqtt netty的子專案,mqtt編解碼外掛
- Pulsar/Kafka 流行的訊息中介軟體作為後端儲存
核心pom依賴如下
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mqtt-client.version}</version>
<scope>test</scope>
</dependency>
軟體引數設計
軟體引數可謂是非常常見,複雜的開源專案,引數甚至可以達到上百個、配置檔案長達數千行。我們需要的配置有
MqttServer監聽的埠
監聽埠的配置即使是寫demo也非常必要,常常用在單元測試中,由於單元測試跑完之後,即使網路伺服器關閉,作業系統也不會立即釋放埠,所以單元測試的時候指定隨機埠非常關鍵,在java中,我們可以通過這樣的工具類來獲取一個空閒的埠。未配置的話,我們就使用mqtt的預設埠1883。
package io.github.protocol.mqtt.broker.util;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
public class SocketUtil {
public static int getFreePort() {
try (ServerSocket serverSocket = new ServerSocket(0)) {
return serverSocket.getLocalPort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
後端儲存配置
我們的mqtt閘道器是沒有可靠的儲存能力的,依賴後端的訊息中介軟體來做持久化處理。後端規劃支援Pulsar、Kafka兩種型別。定義列舉類如下
public enum ProcessorType {
KAFKA,
PULSAR,
}
對應的KafkaProcessorConfig、PulsarProcessorConfig比較簡單,包含基礎的連線地址即可,如果後續要做效能調優、安全,這塊還是會有更多的配置項
@Setter
@Getter
public class KafkaProcessorConfig {
private String bootstrapServers = "localhost:9092";
public KafkaProcessorConfig() {
}
}
@Setter
@Getter
public class PulsarProcessorConfig {
private String httpUrl = "http://localhost:8080";
private String serviceUrl = "pulsar://localhost:6650";
public PulsarProcessorConfig() {
}
}
啟動netty MqttServer
我們通過netty啟動一個mqttServer,新增mqtt解碼器
package io.github.protocol.mqtt.broker;
import io.github.protocol.mqtt.broker.processor.KafkaProcessor;
import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;
import io.github.protocol.mqtt.broker.processor.MqttProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
import io.github.protocol.mqtt.broker.util.SocketUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MqttServer {
private final MqttServerConfig mqttServerConfig;
public MqttServer() {
this(new MqttServerConfig());
}
public MqttServer(MqttServerConfig mqttServerConfig) {
this.mqttServerConfig = mqttServerConfig;
if (mqttServerConfig.getPort() == 0) {
mqttServerConfig.setPort(SocketUtil.getFreePort());
}
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// decoder
p.addLast(new MqttDecoder());
p.addLast(MqttEncoder.INSTANCE);
}
});
// Start the server.
ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private MqttProcessor processor(MqttServerConfig config) {
return switch (config.getProcessorType()) {
case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());
case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());
};
}
public int getPort() {
return mqttServerConfig.getPort();
}
}
MqttserverStarter.java
我們寫一個簡單的main函式用來啟動mqttServer,方便調測
package io.github.protocol.mqtt.broker;
public class MqttServerStarter {
public static void main(String[] args) throws Exception {
new MqttServer().start();
}
}
客戶端使用eclipse mqtt client進行測試
package io.github.protocol.mqtt;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Log4j2
public class MqttClientPublishExample {
public static void main(String[] args) throws Exception {
String topic = "MQTT Examples";
String content = "Message from MqttPublishExample";
int qos = 2;
String broker = "tcp://127.0.0.1:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
log.info("Connecting to broker: {}", broker);
sampleClient.connect(connOpts);
log.info("Connected");
log.info("Publishing message: {}", content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
log.info("Message published");
sampleClient.disconnect();
log.info("Disconnected");
System.exit(0);
} catch (MqttException me) {
log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);
}
}
}
然後我們先執行MqttServer,再執行MqttClient,發現MqttClient卡住了
Connecting to broker: tcp://127.0.0.1:1883
這是為什麼呢,我們通過抓包發現僅僅只有客戶端傳送了Mqtt connect資訊,服務端並沒有響應
但是根據mqtt標準協議,傳送Connect訊息,必須要有ConnAck響應
所以我們需要在接收到Connect後,返回connAck訊息。我們建立一個MqttHandler,讓他繼承ChannelInboundHandlerAdapter, 用來接力MqttDecoder解碼完成後的訊息,這裡要重點繼承其中的channelRead方法,以及channelInactive方法,用來釋放斷鏈時需要釋放的資源
package com.github.shoothzj.mqtt;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
}
然後把這個handler加入到netty的職責鏈中,放到解碼器的後面
在mqtt handler中插入我們的程式碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
if (msg instanceof MqttConnectMessage) {
handleConnect(ctx, (MqttConnectMessage) msg);
} else {
log.error("Unsupported type msg [{}]", msg);
}
}
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
log.info("connect msg is [{}]", connectMessage);
}
打印出connectMessage如下
[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
通常,mqtt connect message中會包含qos、使用者名稱、密碼等資訊,由於我們啟動客戶端的時候也沒有攜帶使用者名稱和密碼,這裡獲取到的都為null,我們先不校驗這些訊息,直接給客戶端返回connack訊息,代表連線成功
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
ctx.channel().writeAndFlush(ackMessage);
我們再執行起Server和Client,隨後可以看到已經走過了Connect階段,進入了publish message過程,接下來我們再實現更多的其他場景
附上此階段的MqttHandler程式碼
package com.github.shoothzj.mqtt;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import lombok.extern.slf4j.Slf4j;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
if (msg instanceof MqttConnectMessage) {
handleConnect(ctx, (MqttConnectMessage) msg);
} else {
log.error("Unsupported type msg [{}]", msg);
}
}
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
log.info("connect msg is [{}]", connectMessage);
final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();
final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
final MqttConnectPayload connectPayload = connectMessage.payload();
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
ctx.channel().writeAndFlush(ackMessage);
}
}
我們當前把所有的邏輯都放在MqttHandler裡面,不方便後續的擴充套件。抽象出一個MqttProcessor介面來處理具體的請求,MqttHandler負責解析MqttMessage的型別並分發。MqttProcess介面設計如下
package io.github.protocol.mqtt.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
public interface MqttProcessor {
void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;
void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;
void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;
void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;
void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;
void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;
void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;
void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;
void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processDisconnect(ChannelHandlerContext ctx) throws Exception;
void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
}
我們允許這些方法丟擲異常,當遇到極難處理的故障時,把mqtt連線斷掉(如後端儲存故障),等待客戶端的重連。
MqttHandler中來呼叫MqttProcessor,相關MqttHandler程式碼如下
Preconditions.checkArgument(message instanceof MqttMessage);
MqttMessage msg = (MqttMessage) message;
try {
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) {
// Unsupported protocol version
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,
false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(
MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
false), null);
ctx.writeAndFlush(connAckMessage);
log.error("connection refused due to invalid protocol, client address [{}]",
ctx.channel().remoteAddress());
ctx.close();
return;
} else if (cause instanceof MqttIdentifierRejectedException) {
// ineligible clientId
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,
false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
false), null);
ctx.writeAndFlush(connAckMessage);
log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());
ctx.close();
return;
}
throw new IllegalStateException(msg.decoderResult().cause().getMessage());
}
MqttMessageType messageType = msg.fixedHeader().messageType();
if (log.isDebugEnabled()) {
log.debug("Processing MQTT Inbound handler message, type={}", messageType);
}
switch (messageType) {
case CONNECT:
Preconditions.checkArgument(msg instanceof MqttConnectMessage);
processor.processConnect(ctx, (MqttConnectMessage) msg);
break;
case CONNACK:
Preconditions.checkArgument(msg instanceof MqttConnAckMessage);
processor.processConnAck(ctx, (MqttConnAckMessage) msg);
break;
case PUBLISH:
Preconditions.checkArgument(msg instanceof MqttPublishMessage);
processor.processPublish(ctx, (MqttPublishMessage) msg);
break;
case PUBACK:
Preconditions.checkArgument(msg instanceof MqttPubAckMessage);
processor.processPubAck(ctx, (MqttPubAckMessage) msg);
break;
case PUBREC:
processor.processPubRec(ctx, msg);
break;
case PUBREL:
processor.processPubRel(ctx, msg);
break;
case PUBCOMP:
processor.processPubComp(ctx, msg);
break;
case SUBSCRIBE:
Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);
processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case SUBACK:
Preconditions.checkArgument(msg instanceof MqttSubAckMessage);
processor.processSubAck(ctx, (MqttSubAckMessage) msg);
break;
case UNSUBSCRIBE:
Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);
processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);
processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);
break;
case PINGREQ:
processor.processPingReq(ctx, msg);
break;
case PINGRESP:
processor.processPingResp(ctx, msg);
break;
case DISCONNECT:
processor.processDisconnect(ctx);
break;
case AUTH:
processor.processAuth(ctx, msg);
break;
default:
throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
}
} catch (Throwable ex) {
ReferenceCountUtil.safeRelease(msg);
log.error("Exception was caught while processing MQTT message, ", ex);
ctx.close();
}
這裡的程式碼,主要是針對MqttMessage的不同型別,呼叫MqttProcessor的不同方法,值得一提的有兩點
- 提前判斷了一些解碼異常,fast fail
- 全域性捕獲異常,並進行斷鏈處理
維護MqttSession
維護Mqtt會話的session,主要用來持續跟蹤客戶端會話資訊,跟蹤在系統中佔用的資源等,考慮到無論是何種後端實現,都需要維護Mqtt的Session,我們構築一個AbstractMqttProcessor來維護MqttSession
package io.github.protocol.mqtt.broker.processor;
import io.github.protocol.mqtt.broker.MqttSessionKey;
import io.github.protocol.mqtt.broker.auth.MqttAuth;
import io.github.protocol.mqtt.broker.util.ChannelUtils;
import io.github.protocol.mqtt.broker.util.MqttMessageUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.stream.IntStream;
@Slf4j
public abstract class AbstractProcessor implements MqttProcessor {
protected final MqttAuth mqttAuth;
public AbstractProcessor(MqttAuth mqttAuth) {
this.mqttAuth = mqttAuth;
}
@Override
public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
String clientId = msg.payload().clientIdentifier();
String username = msg.payload().userName();
byte[] pwd = msg.payload().passwordInBytes();
if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,
false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
false), null);
ctx.writeAndFlush(connAckMessage);
log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
ctx.close();
return;
}
if (!mqttAuth.connAuth(clientId, username, pwd)) {
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,
false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
false), null);
ctx.writeAndFlush(connAckMessage);
log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
ctx.close();
return;
}
MqttSessionKey mqttSessionKey = new MqttSessionKey();
mqttSessionKey.setUsername(username);
mqttSessionKey.setClientId(clientId);
ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);
log.info("username {} clientId {} remote address {} connected",
username, clientId, ctx.channel().remoteAddress());
onConnect(mqttSessionKey);
MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK,
false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
null);
ctx.writeAndFlush(mqttConnectMessage);
}
protected void onConnect(MqttSessionKey mqttSessionKey) {
}
@Override
public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("publish, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
return;
}
if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {
log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
return;
}
if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
log.error("does not support QoS2 protocol. clientId {}, username {} ",
mqttSession.getClientId(), mqttSession.getUsername());
return;
}
onPublish(ctx, mqttSession, msg);
}
protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
MqttPublishMessage msg) throws Exception {
}
@Override
public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("sub, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
onSubscribe(ctx, mqttSession, msg.payload());
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,
false, MqttQoS.AT_MOST_ONCE, false, 0);
IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());
MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());
ctx.writeAndFlush(MqttMessageFactory.newMessage(
fixedHeader,
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
payload));
}
protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
MqttSubscribePayload subscribePayload) throws Exception {
}
@Override
public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
ctx.writeAndFlush(MqttMessageUtil.pingResp());
}
@Override
public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
@Override
public void processDisconnect(ChannelHandlerContext ctx) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());
}
onDisconnect(mqttSession);
}
protected void onDisconnect(MqttSessionKey mqttSessionKey) {
}
@Override
public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) {
log.error("auth, client address {} not authed", ctx.channel().remoteAddress());
ctx.close();
}
}
}
可以看到,這裡的AbstractProcessor主要是維護了MqttSessionKey,校驗MqttSessionKey,並攔截publish中不支援的Qos2、Failure。同時,也影響了mqtt心跳請求。同樣的,我們允許在onPublish、onSubscribe中丟擲異常。
基於訊息佇列實現的mqtt閘道器的基礎思想也比較簡單,簡而言之就是,有publish訊息的時候向訊息佇列中生產訊息。有訂閱的時候就從訊息佇列中拉取訊息。由此延伸出來,我們可能需要維護每個mqtt topic和producer、consumer的對應關係,因為像kafka、pulsar這些訊息中介軟體的消費者都是區分topic的,片段通用程式碼如下:
protected final ReentrantReadWriteLock.ReadLock rLock;
protected final ReentrantReadWriteLock.WriteLock wLock;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
protected final Map<MqttTopicKey, P> producerMap;
protected final Map<MqttTopicKey, C> consumerMap;
public AbstractMqProcessor(MqttAuth mqttAuth) {
super(mqttAuth);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
rLock = lock.readLock();
wLock = lock.writeLock();
this.sessionProducerMap = new HashMap<>();
this.sessionConsumerMap = new HashMap<>();
this.producerMap = new HashMap<>();
this.consumerMap = new HashMap<>();
}
@Override
protected void onConnect(MqttSessionKey mqttSessionKey) {
wLock.lock();
try {
sessionProducerMap.put(mqttSessionKey, new ArrayList<>());
sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());
} finally {
wLock.unlock();
}
}
@Override
protected void onDisconnect(MqttSessionKey mqttSessionKey) {
wLock.lock();
try {
// find producers
List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);
if (produceTopicKeys != null) {
for (MqttTopicKey mqttTopicKey : produceTopicKeys) {
P producer = producerMap.get(mqttTopicKey);
if (producer != null) {
ClosableUtils.close(producer);
producerMap.remove(mqttTopicKey);
}
}
}
sessionProducerMap.remove(mqttSessionKey);
List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);
if (consumeTopicKeys != null) {
for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {
C consumer = consumerMap.get(mqttTopicKey);
if (consumer != null) {
ClosableUtils.close(consumer);
consumerMap.remove(mqttTopicKey);
}
}
}
sessionConsumerMap.remove(mqttSessionKey);
} finally {
wLock.unlock();
}
}
}
kafka processor實現
由於kafka producer不區分topic,我們可以在kafka processor中複用producer,在將來單個kafka producer的效能到達上限時,我們可以將kafka producer擴充套件為kafka producer列表進行輪詢處理,消費者由於mqtt協議可能針對每個訂閱topic有不同的行為,不合適複用同一個消費者例項。我們在建構函式中啟動KafkaProducer
private final KafkaProcessorConfig kafkaProcessorConfig;
private final KafkaProducer<String, ByteBuffer> producer;
public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {
super(mqttAuth);
this.kafkaProcessorConfig = kafkaProcessorConfig;
this.producer = createProducer();
}
protected KafkaProducer<String, ByteBuffer> createProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);
return new KafkaProducer<>(properties);
}
處理MqttPublish訊息,MqttPublish訊息包含如下幾個關鍵引數
MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
String topic = publishMessage.variableHeader().topicName();
ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
其中
- qos代表這條訊息的質量級別,0沒有任何保障,1代表至少一次,2代表恰好一次。當前僅支援qos0、qos1
- topicName就是topic的名稱
- ByteBuffer就是訊息的內容
根據topic、qos傳送訊息,程式碼如下
String topic = msg.variableHeader().topicName();
ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);
return;
}
log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());
});
case AT_LEAST_ONCE -> {
try {
RecordMetadata recordMetadata = producer.send(record).get();
log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
mqttSessionKey, recordMetadata.topic(),
recordMetadata.partition(), recordMetadata.offset());
ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));
} catch (Exception e) {
log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);
}
}
case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
}
處理訂閱訊息,我們暫時僅根據訂閱的topic,建立topic進行消費即可,由於kafka原生客戶端建議的消費程式碼模式如下
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, byte[]> record : records) {
// do logic
}
}
我們需要切換到其他執行緒對consumer進行訊息,書寫一個KafkaConsumerListenerWrapper的wrapper,轉換為listener非同步消費模型
package io.github.protocol.mqtt.broker.processor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Slf4j
public class KafkaConsumerListenerWrapper implements AutoCloseable {
private final AdminClient adminClient;
private final KafkaConsumer<String, byte[]> consumer;
public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {
Properties adminProperties = new Properties();
adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
this.adminClient = KafkaAdminClient.create(adminProperties);
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
this.consumer = new KafkaConsumer<>(properties);
}
public void start(String topic, KafkaMessageListener listener) throws Exception {
try {
TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))
.values().get(topic).get();
log.info("topic info is {}", topicDescription);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof UnknownTopicOrPartitionException) {
log.info("topic {} not exist, create it", topic);
adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));
} else {
log.error("find topic info {} error", topic, ee);
}
} catch (Exception e) {
throw new IllegalStateException("find topic info error", e);
}
consumer.subscribe(Collections.singletonList(topic));
log.info("consumer topic {} start", topic);
new Thread(() -> {
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, byte[]> record : records) {
listener.messageReceived(record);
}
}
} catch (WakeupException we) {
consumer.close();
} catch (Exception e) {
log.error("consumer topic {} consume error", topic, e);
consumer.close();
}
}).start();
Thread.sleep(5_000);
}
@Override
public void close() throws Exception {
log.info("wake up {} consumer", consumer);
consumer.wakeup();
}
}
@Override
protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
MqttSubscribePayload subscribePayload) throws Exception {
for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());
subscribe(ctx, consumer, topicSubscription.topicName());
}
}
private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {
MqttTopicKey mqttTopicKey = new MqttTopicKey();
mqttTopicKey.setTopic(topic);
mqttTopicKey.setMqttSessionKey(mqttSessionKey);
wLock.lock();
try {
KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);
if (consumer == null) {
consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());
sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
if (mqttTopicKeys == null) {
mqttTopicKeys = new ArrayList<>();
}
mqttTopicKeys.add(mqttTopicKey);
return mqttTopicKeys;
});
consumerMap.put(mqttTopicKey, consumer);
}
return consumer;
} finally {
wLock.unlock();
}
}
protected void subscribe(ChannelHandlerContext ctx,
KafkaConsumerListenerWrapper consumer, String topic) throws Exception {
BoundInt boundInt = new BoundInt(65535);
consumer.start(topic, record -> {
log.info("receive message from kafka, topic {}, partition {}, offset {}",
record.topic(), record.partition(), record.offset());
MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());
ctx.writeAndFlush(mqttPublishMessage);
});
}
在上述的程式碼中,有一個需要通篇注意的點:日誌列印的時候,注意要將關鍵的資訊攜帶,比如:topic、mqtt username、mqtt clientId等,在寫demo的時候沒有感覺,但是海量請求下需要定位問題的時候,就知道這些資訊的關鍵之處了。
使用BountInt這個簡單的工具類來生成從0~65535的packageId,滿足協議的要求
pulsar processor實現
pulsar相比kafka來說,更適合作為mqtt協議的代理。原因有如下幾點:
- pulsar支援百萬topic、topic實現更輕量
- pulsar原生支援listener的消費模式,不需要每個消費者啟動一個執行緒
- pulsar支援share的消費模式,消費模式更靈活
- pulsar消費者的subscribe可確保成功建立訂閱,相比kafka的消費者沒有這樣的語義保障
protected final ReentrantReadWriteLock.ReadLock rLock;
protected final ReentrantReadWriteLock.WriteLock wLock;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;
protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;
private final PulsarProcessorConfig pulsarProcessorConfig;
private final PulsarAdmin pulsarAdmin;
private final PulsarClient pulsarClient;
public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {
super(mqttAuth);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
rLock = lock.readLock();
wLock = lock.writeLock();
this.sessionProducerMap = new HashMap<>();
this.sessionConsumerMap = new HashMap<>();
this.producerMap = new HashMap<>();
this.consumerMap = new HashMap<>();
this.pulsarProcessorConfig = pulsarProcessorConfig;
try {
this.pulsarAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())
.build();
this.pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarProcessorConfig.getServiceUrl())
.build();
} catch (Exception e) {
throw new IllegalStateException("Failed to create pulsar client", e);
}
}
處理publish訊息
@Override
protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
MqttPublishMessage msg) throws Exception {
String topic = msg.variableHeader().topicName();
Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);
int len = msg.payload().readableBytes();
byte[] messageBytes = new byte[len];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE -> producer.sendAsync(messageBytes).
thenAccept(messageId -> log.info("clientId [{}],"
+ " username [{}]. send message to pulsar success messageId: {}",
mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))
.exceptionally((e) -> {
log.error("clientId [{}], username [{}]. send message to pulsar fail: ",
mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);
return null;
});
case AT_LEAST_ONCE -> {
try {
MessageId messageId = producer.send(messageBytes);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,
false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",
mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);
ctx.writeAndFlush(pubAckMessage);
} catch (PulsarClientException e) {
log.error("clientId [{}], username [{}]. send pulsar error: {}",
mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());
}
}
case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
}
}
private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {
MqttTopicKey mqttTopicKey = new MqttTopicKey();
mqttTopicKey.setTopic(topic);
mqttTopicKey.setMqttSessionKey(mqttSessionKey);
rLock.lock();
try {
Producer<byte[]> producer = producerMap.get(mqttTopicKey);
if (producer != null) {
return producer;
}
} finally {
rLock.unlock();
}
wLock.lock();
try {
Producer<byte[]> producer = producerMap.get(mqttTopicKey);
if (producer == null) {
producer = createProducer(topic);
sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
if (mqttTopicKeys == null) {
mqttTopicKeys = new ArrayList<>();
}
mqttTopicKeys.add(mqttTopicKey);
return mqttTopicKeys;
});
producerMap.put(mqttTopicKey, producer);
}
return producer;
} finally {
wLock.unlock();
}
}
protected Producer<byte[]> createProducer(String topic) throws Exception {
return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
}
處理subscribe訊息
@Override
protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
MqttSubscribePayload subscribePayload) throws Exception {
for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
subscribe(ctx, mqttSessionKey, topicSubscription.topicName());
}
}
protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
String topic) throws Exception {
MqttTopicKey mqttTopicKey = new MqttTopicKey();
mqttTopicKey.setTopic(topic);
mqttTopicKey.setMqttSessionKey(mqttSessionKey);
wLock.lock();
try {
Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);
if (consumer == null) {
consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);
sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
if (mqttTopicKeys == null) {
mqttTopicKeys = new ArrayList<>();
}
mqttTopicKeys.add(mqttTopicKey);
return mqttTopicKeys;
});
consumerMap.put(mqttTopicKey, consumer);
}
} finally {
wLock.unlock();
}
}
protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,
String topic) throws Exception {
BoundInt boundInt = new BoundInt(65535);
try {
PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);
log.info("topic {} partitioned stats {}", topic, partitionedStats);
} catch (PulsarAdminException.NotFoundException nfe) {
log.info("topic {} not found", topic);
pulsarAdmin.topics().createPartitionedTopic(topic, 1);
}
return pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.messageListener((consumer, msg) -> {
log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());
MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());
ctx.writeAndFlush(mqttPublishMessage);
})
.subscriptionName(username).subscribe();
}
整合測試
kafka
我們可以通過embedded-kafka-java這個專案來啟動用做單元測試的kafka broker。通過如下的group引入依賴
<dependency>
<groupId>io.github.embedded-middleware</groupId>
<artifactId>embedded-kafka-core</artifactId>
<version>0.0.2</version>
<scope>test</scope>
</dependency>
我們就可以通過如下的程式碼啟動基於kafka的mqtt broker
@Slf4j
public class MqttKafkaTestUtil {
public static MqttServer setupMqttKafka() throws Exception {
EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();
new Thread(() -> {
try {
embeddedKafkaServer.start();
} catch (Exception e) {
log.error("kafka broker started exception ", e);
}
}).start();
Thread.sleep(5_000);
MqttServerConfig mqttServerConfig = new MqttServerConfig();
mqttServerConfig.setPort(0);
mqttServerConfig.setProcessorType(ProcessorType.KAFKA);
KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();
kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));
mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);
MqttServer mqttServer = new MqttServer(mqttServerConfig);
new Thread(() -> {
try {
mqttServer.start();
} catch (Exception e) {
log.error("mqsar broker started exception ", e);
}
}).start();
Thread.sleep(5000L);
return mqttServer;
}
}
kafka端到端測試用例,比較簡單,通過mqtt client publish一條訊息,然後消費出來
@Log4j2
public class MqttKafkaPubSubTest {
@Test
public void pubSubTest() throws Exception {
MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();
String topic = UUID.randomUUID().toString();
String content = "test-msg";
String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
String clientId = UUID.randomUUID().toString();
MemoryPersistence persistence = new MemoryPersistence();
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(UUID.randomUUID().toString());
connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
connOpts.setCleanSession(true);
log.info("Mqtt connecting to broker");
sampleClient.connect(connOpts);
CompletableFuture<String> future = new CompletableFuture<>();
log.info("Mqtt subscribing");
sampleClient.subscribe(topic, (s, mqttMessage) -> {
log.info("messageArrived");
future.complete(mqttMessage.toString());
});
log.info("Mqtt subscribed");
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
log.info("Mqtt message publishing");
sampleClient.publish(topic, message);
log.info("Mqtt message published");
TimeUnit.SECONDS.sleep(3);
sampleClient.disconnect();
String msg = future.get(5, TimeUnit.SECONDS);
Assertions.assertEquals(content, msg);
}
}
pulsar
我們可以通過embedded-pulsar-java這個專案來啟動用做單元測試的pulsar broker。通過如下的group引入依賴
<dependency>
<groupId>io.github.embedded-middleware</groupId>
<artifactId>embedded-pulsar-core</artifactId>
<version>0.0.2</version>
<scope>test</scope>
</dependency>
我們就可以通過如下的程式碼啟動基於pulsar的mqtt broker
@Slf4j
public class MqttPulsarTestUtil {
public static MqttServer setupMqttPulsar() throws Exception {
EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();
embeddedPulsarServer.start();
MqttServerConfig mqttServerConfig = new MqttServerConfig();
mqttServerConfig.setPort(0);
mqttServerConfig.setProcessorType(ProcessorType.PULSAR);
PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();
pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));
pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));
mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);
MqttServer mqttServer = new MqttServer(mqttServerConfig);
new Thread(() -> {
try {
mqttServer.start();
} catch (Exception e) {
log.error("mqsar broker started exception ", e);
}
}).start();
Thread.sleep(5000L);
return mqttServer;
}
}
pulsar端到端測試用例,比較簡單,通過mqtt client publish一條訊息,然後消費出來
@Log4j2
public class MqttPulsarPubSubTest {
@Test
public void pubSubTest() throws Exception {
MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();
String topic = UUID.randomUUID().toString();
String content = "test-msg";
String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
String clientId = UUID.randomUUID().toString();
MemoryPersistence persistence = new MemoryPersistence();
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(UUID.randomUUID().toString());
connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
connOpts.setCleanSession(true);
log.info("Mqtt connecting to broker");
sampleClient.connect(connOpts);
CompletableFuture<String> future = new CompletableFuture<>();
log.info("Mqtt subscribing");
sampleClient.subscribe(topic, (s, mqttMessage) -> {
log.info("messageArrived");
future.complete(mqttMessage.toString());
});
log.info("Mqtt subscribed");
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
log.info("Mqtt message publishing");
sampleClient.publish(topic, message);
log.info("Mqtt message published");
TimeUnit.SECONDS.sleep(3);
sampleClient.disconnect();
String msg = future.get(5, TimeUnit.SECONDS);
Assertions.assertEquals(content, msg);
}
}
效能優化
這裡我們簡單描述幾個效能優化點,像一些調整執行緒數、buffer大小這類的引數調整就不在這裡贅述了,這些需要具體的效能壓測來決定引數的設定。
在linux上使用Epoll網路模型
public class EventLoopUtil {
/**
* @return an EventLoopGroup suitable for the current platform
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(nThreads, threadFactory);
} else {
return new NioEventLoopGroup(nThreads, threadFactory);
}
}
public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}
}
通過Epollo.isAvailable,以及在指定channel型別的時候通過判斷group的型別選擇對應的channel型別
EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,
new DefaultThreadFactory("mqtt-acceptor"));
EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,
new DefaultThreadFactory("mqtt-worker"));
b.group(acceptorGroup, workerGroup)
// key point
.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup))
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// decoder
p.addLast(new MqttDecoder());
p.addLast(MqttEncoder.INSTANCE);
p.addLast(new MqttHandler(processor(mqttServerConfig)));
}
});
關閉tcp keepalive
由於mqtt協議本身就有心跳機制,所以可以關閉tcp的keepalive,依賴mqtt協議層的心跳即可,節約海量連線下的效能。配置ChannelOption.SO_KEEPALIVE為false即可
.option(ChannelOption.SO_KEEPALIVE, false)
超時時間調短
預設情況下,無論是單元測試中mqtt,還是pulsar producer和kafka producer的生產超時時間,都相對較長(一般為30s),如果在內網環境部署,可以將超時時間調整到5s。來避免無意義的超時等待
使用多個KafkaProducer來優化效能
單個KafkaProducer會達到tcp鏈路頻寬的瓶頸,當有海量請求,而延時在kafka生產比較突出的情況下,可以考慮啟動多個KafkaProducer。並根據mqtt協議的特點(鏈路多,單個鏈路上qps不高),用mqttSessionKey的雜湊值來決定使用那個KafkaProducer傳送訊息
在KafkaProcessorConfig中新增如下配置,生產者個數,預設為1
private int producerNum = 1;
在初始化的時候,初始化Producer陣列,而不是單個Producer
this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];
for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {
producerArray[i] = createProducer();
}
封裝一個方法來獲取producer
private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {
return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];
}
結語
本文的程式碼均已上傳到github。我們這裡僅僅只實現了基礎的mqtt 連線、釋出、訂閱功能,甚至不支援暫停、取消訂閱。想要實現一個成熟商用的mqtt閘道器,我們還需要使用者隔離、對協議的更多支援、可靠性、可運維、流控、安全等能力。如有商用生產級別的mqtt需求,又無法快速構築成熟的mqtt閘道器的,可以選擇華為雲IoTDA服務,提供穩定可靠的mqtt服務,支援海量裝置連線上雲、裝置和雲端訊息雙向通訊能力。
- 使用卷積神經網路實現圖片去摩爾紋
- 核心不中斷前提下,Gaussdb(DWS)記憶體報錯排查方法
- 簡述幾種常用的排序演算法
- 自動調優工具AOE,讓你的模型在昇騰平臺上高效執行
- GaussDB(DWS)運維:導致SQL執行不下推的改寫方案
- 詳解目標檢測模型的評價指標及程式碼實現
- CosineWarmup理論與程式碼實戰
- 淺談DWS函數出參方式
- 程式碼實戰帶你瞭解深度學習中的混合精度訓練
- python進階:帶你學習實時目標跟蹤
- Ascend CL兩種資料預處理的方式:AIPP和DVPP
- 詳解ResNet 網路,如何讓網路變得更“深”了
- 帶你掌握如何檢視並讀懂昇騰平臺的應用日誌
- InstructPix2Pix: 動動嘴皮子,超越PS
- 何為神經網路卷積層?
- 在昇騰平臺上對TensorFlow網路進行效能調優
- 介紹3種ssh遠端連線的方式
- 分散式資料庫架構路線大揭祕
- DBA必備的Mysql知識點:資料型別和運算子
- 5個高併發導致數倉資源類報錯分析