學習RocketMQ
的第一天,應該從官網的QuickStart
案例開始,這一節就來介紹一下如何部署單機RocketMQ
以及進行訊息的收發。
0. 版本說明
使用RocketMQ
需要有如下的硬體要求:
64位作業系統 JDK 1.8+ Maven 3.2.x Git 4GB+ 硬碟空間(broker 儲存需要)
瞭解版本說明之後,我們就可以開始進行實戰了。
1. 獲取原始碼
開啟RocketMQ
在Github
上的主頁,獲取倉庫地址。然後在本地電腦上克隆本倉庫。
git clone https://github.com/apache/rocketmq.git
複製程式碼
2. 啟動伺服器
2.1 啟動 nameserver
開啟專案後,第一步要做的是啟動nameserver
,這是RocketMQ
的路由中心,它提供輕量級服務發現和路由,主要的作用是儲存路由資訊,管理broker
節點,包括路由的查詢、註冊和刪除。
在RocketMQ
工程的namesrv
包中找到入口類org.apache.rocketmq.namesrv.NamesrvStartup
,執行這個類的main
函式,發現報錯了。
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
複製程式碼
這個報錯是因為在為nameserver
設定相關配置時沒有設定成功。
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
複製程式碼
ROCKETMQ_HOME
環境變數主要用於設定nameserver
的配置,只需要將包含conf
配置目錄的這個路徑賦值給環境變數ROCKETMQ_HOME
即可,如下圖。
再次執行main
函式,就會發現啟動成功。
The Name Server boot success. serializeType=JSON
複製程式碼
2.2 啟動 broker
接下來要啟動的是broker
,它主要用於訊息儲存,接收和傳送。
同樣在RocketMQ
工程的broker
包中找到入口類org.apache.rocketmq.broker.BrokerStartup
,但是與啟動nameserver
不同的是,啟動broker
時需要指定註冊的nameserver
地址,在啟動命令中輸入-n 127.0.0.1:9876
即可。
執行main
函式,如果發現與之前一樣的報錯,重新設定該Application
環境變數即可,執行成功的輸出如下。
The broker[daxiongMac.local, 192.168.31.126:10911] boot success. serializeType=JSON and name server is namesrvAddr=127.0.0.1:9876
複製程式碼
至此,RocketMQ
的路由中心和接收發訊息的伺服器就啟動成功了,我們可以通過nameserver
和broker
來進行訊息傳遞了。
3. 啟動 Producer 傳送訊息
找到example
包的org.apache.rocketmq.example.quickstart.Producer
類,這是一個最簡單的訊息生產者,我們來看一下它的原始碼。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
複製程式碼
使用 DefaultMQProducer
類來建立生產者例項,並指定訊息組Group
和路由中心地址。啟動生產者例項。 建立訊息,指定 Topic
和Tag
(用於區分訊息的類別)。傳送訊息。 關閉生產者例項。
21:22:35.450 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E4E0000, offsetMsgId=C0A81F7E00002A9F0000000000068BA2, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=1], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E810001, offsetMsgId=C0A81F7E00002A9F0000000000068C54, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=2], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E850002, offsetMsgId=C0A81F7E00002A9F0000000000068D06, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=3], queueOffset=500]
......
複製程式碼
這樣就實現了RocketMQ
的傳送訊息。
4. 啟動 Consumer 消費訊息
找到example
包的org.apache.rocketmq.example.quickstart.Consumer
類,這是一個最簡單的訊息消費者,我們來看一下它的原始碼。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
複製程式碼
使用 DefaultMQPushConsumer
類來建立消費者例項,並指定訊息組Group
、路由中心地址、消費模式、訊息類別。註冊訊息監聽器,監聽訊息,消費訊息,返回消費成功標識。 啟動生產者例項。
21:24:03.482 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=2, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756319, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756321, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F00000000000691E4, commitLogOffset=430564, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844575, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9F0009, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=1, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756316, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756317, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F0000000000069132, commitLogOffset=430386, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844576, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9C0008, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]]
......
複製程式碼
至此,我們就完成了RocketMQ
的快速入門,啟動nameserver
和broker
,建立生產者傳送訊息,建立消費者接收訊息。
“版權宣告:本文為Planeswalker23所創,轉載請帶上原文連結,感謝。
本文已上傳個人公眾號,歡迎掃碼關注。
本文使用 mdnice 排版