RabbitMQ 延遲佇列,太實用了!

語言: CN / TW / HK

作者:海向
出處:https://www.cnblogs.com/haixiang/p/10966985.html

應用場景

目前常見的應用軟體都有訊息的延遲推送的影子,應用也極為廣泛,例如:

  • 淘寶七天自動確認收貨。在我們簽收商品後,物流系統會在七天後延時傳送一個訊息給支付系統,通知支付系統將款打給商家,這個過程持續七天,就是使用了訊息中介軟體的延遲推送功能。
  • 12306 購票支付確認頁面。我們在選好票點選確定跳轉的頁面中往往都會有倒計時,代表著 30 分鐘內訂單不確認的話將會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時訊息給訂單系統,延時30分鐘,告訴訂單系統訂單未完成,如果我們在30分鐘內完成了訂單,則可以通過邏輯程式碼判斷來忽略掉收到的訊息。

在上面兩種場景中,如果我們使用下面兩種傳統解決方案無疑大大降低了系統的整體效能和吞吐量:

  • 使用 redis 給訂單設定過期時間,最後通過判斷 redis 中是否還有該訂單來決定訂單是否已經完成。這種解決方案相較於訊息的延遲推送效能較低,因為我們知道 redis 都是儲存於記憶體中,我們遇到惡意下單或者刷單的將會給記憶體帶來巨大壓力。
  • 使用傳統的資料庫輪詢來判斷資料庫表中訂單的狀態,這無疑增加了IO次數,效能極低。
  • 使用 jvm 原生的 DelayQueue ,也是大量佔用記憶體,而且沒有持久化策略,系統宕機或者重啟都會丟失訂單資訊。

訊息延遲推送的實現

在 RabbitMQ 3.6.x 之前我們一般採用死信佇列+TTL過期時間來實現延遲佇列,我們這裡不做過多介紹,可以參考之前文章來了解:TTL、死信佇列。

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲佇列的外掛,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。

首先我們建立交換機和訊息佇列,application.properties 中配置與上一篇文章相同。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "MQ.LazyQueue";
    public static final String LAZY_KEY = "lazy.#";

    @Bean
    public TopicExchange lazyExchange(){
        //Map<String, Object> pros = new HashMap<>();
        //設定交換機支援延遲訊息推送
        //pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue(){
        return new Queue(LAZY_QUEUE, true);
    }

    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }
}

我們在 Exchange 的宣告中可以設定exchange.setDelayed(true)來開啟延遲佇列,也可以設定為以下內容傳入交換機宣告的方法中,因為第一種方式的底層就是通過這種方式來實現的。

//Map<String, Object> pros = new HashMap<>();
//設定交換機支援延遲訊息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

傳送訊息時我們需要指定延遲推送的時間,我們這裡在傳送訊息的方法中傳入引數 new MessagePostProcessor() 是為了獲得 Message物件,因為需要藉助 Message物件的api 來設定延遲時間。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //confirmCallback returnCallback 程式碼省略,請參照上一篇
  
    public void sendLazy(Object message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 時間戳 全域性唯一
        CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

        //傳送訊息時指定 header 延遲時間
        rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
                new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //設定訊息持久化
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                //message.getMessageProperties().setHeader("x-delay", "6000");
                message.getMessageProperties().setDelay(6000);
                return message;
            }
        }, correlationData);
    }
}

我們可以觀察 setDelay(Integer i)底層程式碼,也是在 header 中設定 x-delay。等同於我們手動設定 header

message.getMessageProperties().setHeader("x-delay", "6000");
/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
	if (delay == null || delay < 0) {
		this.headers.remove(X_DELAY);
	}
	else {
		this.headers.put(X_DELAY, delay);
	}
}

消費端進行消費

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

    @RabbitListener(queues = "MQ.LazyQueue")
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException{
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("lazy receive " + new String(msg.getBody()));

    }

測試結果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

    @Autowired
    private MQSender mqSender;

    @Test
    public void sendLazy() throws  Exception {
        String msg = "hello spring boot";

        mqSender.sendLazy(msg + ":");
    }
}

果然在 6 秒後收到了訊息 lazy receive hello spring boot:

近期熱文推薦:

1.600+ 道 Java面試題及答案整理(2021最新版)

2.終於靠開源專案弄到 IntelliJ IDEA 啟用碼了,真香!

3.阿里 Mock 工具正式開源,幹掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式釋出,全新顛覆性版本!

5.《Java開發手冊(嵩山版)》最新發布,速速下載!

覺得不錯,別忘了隨手點贊+轉發哦!