后端程序员也浪漫,用消息队列实现一个发随机发消息的七夕礼物
highlight: a11y-dark theme: scrolls-light
我正在参加「创意开发 投稿大赛」详情请看:掘金创意开发大赛来了!
都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办。
我给媳妇写首诗,哈哈
我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看。你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫。当然也可以多发送几个,到时候跟根据topic
控制到底发什么,哈哈。
这里首先得用顺序消息,当然,消息过期时间得设置的长一点。
1 下载并启动RocketMQ
点击下载,这是个windows版本的。
http://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
下载完成解压后长这样:
然后后还需要配置环境变量
这个时候就可以进入到RocketMQ的bin目录启动MQ了
1.1 首先启动name server
js
start mqnamesrv.cmd
1.2 然后启动Broker
js
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
这个时候就启动成功了
2 生产者
需要注意的是,消息必须是顺序消息,不然发送的消息顺序就乱了。一首情诗顺序乱了,读不下去,岂不是很尴尬。 ```java import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.util.List;
public class RocketMQOrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
//读取文件
List<String> messages = getMessages();
for (int i = 0; i < messages.size(); i++) {
String body = messages.get(i);
Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = ((Integer)arg).longValue();
long index = id % mqs.size();
return mqs.get((int) index);
}
}, i);
}
producer.shutdown();
}
static List<String> getMessages() throws IOException {
String temp = null;
File f = new File("E:\Code\online-taxi-three\demo\luke.txt");
InputStreamReader read = new InputStreamReader(new FileInputStream(f));
ArrayList readList = new ArrayList();
BufferedReader reader = new BufferedReader(read);
while ((temp = reader.readLine()) != null && !"".equals(temp)) {
readList.add(temp);
}
return readList;
}
} ```
3 消费者
这里需要注意的是setConsumeThreadMax
和setConsumeThreadMin
都需要设置成1,单线程取消息这样就可以通过sleep控制消息的显示速度,不然并发取消息就很快显示完了。不够唯美。
```java
import lombok.SneakyThrows;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List; import java.util.concurrent.TimeUnit;
public class RockerMQConsumer {
public static void main(String[] args) throws Exception {
//实例化消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
//指定nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setPullBatchSize(1);
//订阅topic
consumer.subscribe("topic_luke","*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List