後端程式設計師也浪漫,用訊息佇列實現一個發隨機發訊息的七夕禮物
highlight: a11y-dark theme: scrolls-light
我正在參加「創意開發 投稿大賽」詳情請看:掘金創意開發大賽來了!
都在過情人節,前端的小哥哥們給女朋友畫個頁面,美美的,寫個chrome外掛,好看的,俺們後端程式設計師咋辦。
我給媳婦寫首詩,哈哈
我決定,把想對媳婦說的,今天傳送到一個MQ裡邊,然後在七夕當天,開啟消費者,將這一段話給俺媳婦看。你看,這就是我好久前對你說的話,這就是我們後端程式設計師的浪漫。當然也可以多傳送幾個,到時候跟根據topic
控制到底發什麼,哈哈。
這裡首先得用順序訊息,當然,訊息過期時間得設定的長一點。
1 下載並啟動RocketMQ
點選下載,這是個windows版本的。
http://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
下載完成解壓後長這樣:
然後後還需要配置環境變數
這個時候就可以進入到RocketMQ的bin目錄啟動MQ了
1.1 首先啟動name server
js
start mqnamesrv.cmd
1.2 然後啟動Broker
js
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
這個時候就啟動成功了
2 生產者
需要注意的是,訊息必須是順序訊息,不然傳送的訊息順序就亂了。一首情詩順序亂了,讀不下去,豈不是很尷尬。 ```java import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.util.List;
public class RocketMQOrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
//讀取檔案
List<String> messages = getMessages();
for (int i = 0; i < messages.size(); i++) {
String body = messages.get(i);
Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = ((Integer)arg).longValue();
long index = id % mqs.size();
return mqs.get((int) index);
}
}, i);
}
producer.shutdown();
}
static List<String> getMessages() throws IOException {
String temp = null;
File f = new File("E:\Code\online-taxi-three\demo\luke.txt");
InputStreamReader read = new InputStreamReader(new FileInputStream(f));
ArrayList readList = new ArrayList();
BufferedReader reader = new BufferedReader(read);
while ((temp = reader.readLine()) != null && !"".equals(temp)) {
readList.add(temp);
}
return readList;
}
} ```
3 消費者
這裡需要注意的是setConsumeThreadMax
和setConsumeThreadMin
都需要設定成1,單執行緒取訊息這樣就可以通過sleep控制訊息的顯示速度,不然併發取訊息就很快顯示完了。不夠唯美。
```java
import lombok.SneakyThrows;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List; import java.util.concurrent.TimeUnit;
public class RockerMQConsumer {
public static void main(String[] args) throws Exception {
//例項化訊息消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
//指定nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setPullBatchSize(1);
//訂閱topic
consumer.subscribe("topic_luke","*");
// 註冊回撥實現類來處理從broker拉取回來的訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List