【轻松上手 RocketMQ 专栏】延迟消息源码解析与场景分析

语言: CN / TW / HK

这一讲,我们主要来讲延迟消息。

这一次我们结合业务来讲。

业务背景

在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。

我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:

  • 启动一个定时任务,每30分钟,定时扫描一遍订单表
  • 如果订单是已付款,则跳过,不处理
  • 如果订单是未付款,但未超过30分钟,不处理
  • 如果订单是未付款,且超过30分钟,就取消订单 (补充:取消订单,其实就是下单的逆向流程)

方案缺点

这个方案有什么缺点?

  • 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
  • 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
  • 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。

解决方案

那针对上述的缺点,我们有没有好的解决方案:

  • 第一,避免扫描全表
  • 第二,谁没付款,就去取消谁,不要做多余的动作
  • 第三,要保证近实时取消订单。(近实时:1s左右)

说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息

简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。

这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。

生产者

上面,介绍的都是方法论,下面就是具体的实操环节了。

下面,简单用一个demo介绍一下生产者

public class Producer {
    public static void main(String[] args) throws Exception{
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");

        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();

        //构建消息
        Message message = new Message("delayTopic","TagA","delayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 重点:设置延迟级别
        message.setDelayTimeLevel(3);
        // 发送消息
        SendResult sendResult = producer.send(message);
        // 打印发送结果
        System.out.println("发送结果:"+sendResult);
        // 关闭生产者
        producer.shutdown();
    }
}

这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。

补充知识点

延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!

消费者

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("delay_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅主题
        consumer.subscribe("delayTopic","TagA");

        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);


        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    //这里主要打印延迟时间≈当前时间-消息的生产时间
                    System.out.println(msg+" 时间="+(System.currentTimeMillis()-messageExt.getBornTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。

知其然知其所以然

上面,你已经知道怎么使用了。

如果面试官问你:RocketMQ的延迟消息底层原理是什么?

那你接着看下去。

看图说话。

  • 第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。

  • 第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。

  • 第三,有消息后,消费者进行消息和后续处理。

上面这里,是一个总体流程图。

然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。

第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic

org.apache.rocketmq.store.CommitLog#putMessage
        //真正的topic
        String topic = msg.getTopic();
        //真正的队列Id
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 延迟级别大于0
            if (msg.getDelayTimeLevel() > 0) {
                // 如果延迟级别大于最大延迟级别,那就把延迟级别设置为最大延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 延迟topicSCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根据延迟级别,获取队列id
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 消息的topic设置为延迟topic,不是设置真正的topic
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        省略部分封装msg的代码..
            //最后把msg追加到mappedFile上,mappedFile这个后续再讲,在这里你把它当做一个文件即可
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);

第二步:定时器扫描信息

  • 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() {
        //通过AtomicBoolean 来确保 有且仅有一次执行start方法
        if (started.compareAndSet(falsetrue)) {
            this.timer = new Timer("ScheduleMessageTimerThread"true);
            // 遍历所有 延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                // key为延迟级别
                Integer level = entry.getKey();
                // value 为 毫秒数
                Long timeDelay = entry.getValue();
                // 根据延迟级别 ,获取对应的offset
                Long offset = this.offsetTable.get(level);
                //
                if (null == offset) {
                    offset = 0L;
                }
                // 为每个延迟级别创建定时任务,开始执行定时任务,1S后开始执行
                if (timeDelay != null) {
                    // 第二步:具体核心执行逻辑在DeliverDelayedMessageTimerTask-->executeOnTimeup()
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
            // 延迟10秒后执行定时任务,flushDelayOffsetInterval=10s,周期也是10秒执行一次
            this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    try {
                        //持久化每个队列的消费offset
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }

2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

public void executeOnTimeup() {
            //根据延迟级别和topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";来找到对应的ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 消费偏移量
            long failScheduleOffset = offset;

            if (cq != null) {
                // 根据消费偏移量从消息队列中获取所有有效消息
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        // 遍历所有消息
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            // 获取消息的物理偏移量
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            // 获取消息的物理长度
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                          
                            //当前时间
                            long now = System.currentTimeMillis();
                            //消费时间
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                            //下一个偏移量
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            //如果消费时间<当前时间,说明应该被消费了
                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) {
                                //根据物理偏移量和长度,获取消息
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        //构建真正 的消息
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        
                                        // 重新把消息发送到真正的消息队列上
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
                                  ...省略一堆不太重要的代码
                                       }
            //这里又重新添加一个新的任务,这次是100毫秒
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

第三步: 消费者后续处理(略)

最后用一张图来总结

好了,写完了,下期见,拜拜。

有问题的话,欢迎留言交流。

每日一问

RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路

欢迎留言

后续文章


本文分享自微信公众号 - RocketMQ官微(ApacheRocketMQ)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。