RabbitMQ高階特性:死信佇列

語言: CN / TW / HK

開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第23天,點選檢視活動詳情

什麼是死信佇列?

死信佇列,英文縮寫DLX,Dead Letter Exchange(死信交換機),當訊息成為Dead message(訊息過期)後,可以被重新發送到另一個交換機,這個交換機就算是DLX,其實死信交換機(佇列)和正常交換機(佇列)沒有什麼區別

為什麼叫死信佇列但是翻譯過來叫死信交換機呢,因為RabbitMQ比較特殊,其他MQ只有佇列沒有交換機這個概念的

正常來說,佇列設定了過期時間,當訊息到了佇列之後,在過期時間內沒有被消費,那麼這個訊息就被丟棄了,但是如果這個佇列綁定了一個DLX死信佇列(交換機),那麼就算訊息過期了也不會被直接丟棄掉,而是會發送給死信交換機,那麼死信交換機又可以繫結其他佇列,將這些訊息儲存到其他佇列,從而又可以進行訊息消費,就算這個意思,過程如圖所示

什麼情況下訊息成為死信佇列

訊息成為死信佇列的三種情況

1 佇列訊息長度達到限制

比如說給佇列最大儲存長度為10,當11條訊息進來的時候,第11條訊息進不去了,那麼第11條訊息就是死信

2 消費者拒絕消費訊息,basicNack/basicReject,並且不把訊息重新放入原目標佇列,requeue=false,

消費者使用basicNack/basicReject,並且requeue=false,表示消費者拒絕重新消費該訊息

3 原佇列存在訊息過期設定,訊息到達超時時間未被消費

原來的佇列存在過期時間,但是到了過期時間還沒有消費該訊息

佇列繫結交換機的方式

給佇列設定兩個引數 x-dead-letter-exchange(設定交換機的名稱)和x-dead-letter-routing-key(傳送訊息時指定的routingkey)

死信佇列程式碼實現

宣告兩套交換機佇列,一套是正常的交換機佇列,一套是死信的交換機佇列

1 宣告正常佇列和交換機test_queue_dlx test_exchange_dlx

2 宣告死信佇列和死信交換機 queue_dlx exchange_dlx

3 正常佇列繫結死信交換機

設定兩個引數

1死信交換機名稱 x-dead-letter-exchange

2傳送給死信交換機的路由鍵 x-dead-letter-routing-key

1 訊息過期時間的測試

測試過期時間的死信 給正常的交換機發送訊息,過了存活時間訊息自動從正常佇列跑到死信佇列

2 佇列長度限制的測試

如果傳送成功 那麼因為設定了最大長度是10,只會有10條進行正常佇列剩下的會跑到死信佇列,過了10s後正常佇列中的訊息也會自動跑到死信佇列中

3 消費者訊息拒收的限制

消費者Consumer監聽正常的佇列,然後讓訊息拒絕接收並且不重回佇列

由於消費者拒收訊息,訊息會直接跑到私信佇列中

最終相關測試程式碼

生產者配置

```

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

https://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" >

<!--載入配置檔案-->

<!-- 定義rabbitmq connectionFactory -->

<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"

port="${rabbitmq.port}"

username="${rabbitmq.username}"

password="${rabbitmq.password}"

virtual-host="${rabbitmq.virtual-host}"

publisher-confirms="true"

publisher-returns="true"

/>

<!--定義管理交換機,佇列-->

<!--定義rabbitTemplate物件操作可以在程式碼中呼叫api傳送訊息-->

<!--訊息的可靠性投遞(生產端)-->

<!--交換機 廣播-->

    <!--繫結queue-->

        <rabbit:binding queue="test_queue_confirm" key="confirm" ></rabbit:binding>

    </rabbit:bindings>

</rabbit:direct-exchange>



<!--ttl訊息存活時間-->

        <!--設定queue佇列存活時間-->

        <!--通過entry設定屬性 鍵值對應的格式 key=鍵 value=值 value-type表示值的資料型別-->

    </rabbit:queue-arguments>

</rabbit:queue>



<!--宣告交換機-->

    <!--繫結交換機和佇列-->

        <rabbit:binding pattern="ttl.#" queue="test_queue_ttl" ></rabbit:binding>

    </rabbit:bindings>

</rabbit:topic-exchange>





<!--

死信佇列

1 宣告正常佇列和交換機test_queue_dlx test_exchange_dlx

2 宣告死信佇列和死信交換機 queue_dlx exchange_dlx

3 正常佇列繫結死信交換機

設定兩個引數

1死信交換機名稱 x-dead-letter-exchange

2傳送給死信交換機的路由鍵 x-dead-letter-routing-key

-->

    <!-- 3正常佇列繫結死信交換機 使用arguments和entry設定引數-->

        <!--3.1死信交換機名稱  x-dead-letter-exchange-->

        <!--3.2傳送給死信交換機的路由鍵 x-dead-letter-routing-key-->

        <!--4讓訊息完成死信-->

        <!--4.2 設定佇列的長度限制 max_length-->

    </rabbit:queue-arguments>

</rabbit:queue>

<!--設定正常的交換機-->

    <!--正常交換機繫結正常佇列-->

        <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx" />

    </rabbit:bindings>

</rabbit:topic-exchange>



<!-- 2 宣告死信佇列和死信交換機  queue_dlx exchange_dlx-->

<!--設定正常的交換機-->

    <!--正常交換機繫結正常佇列-->

        <rabbit:binding pattern="dlx.#" queue="queue_dlx" />

    </rabbit:bindings>

</rabbit:topic-exchange>

```

消費者配置

```

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

https://www.springframework.org/schema/context/spring-context.xsd

http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" >

<!--載入配置檔案-->

<!-- 定義rabbitmq connectionFactory -->

<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"

port="${rabbitmq.port}"

username="${rabbitmq.username}"

password="${rabbitmq.password}"

virtual-host="${rabbitmq.virtual-host}"

/>

<!--配置監聽器bean 掃描這個包的路徑 配置類需要加註解-->

<!--定義監聽器    acknowledge="" 設定簽收方式 -->

    <!--載入對應監聽器的類 具體的類名和監聽的佇列名-->

</rabbit:listener-container>

```

生產者傳送訊息

```

package com.producer.test;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**

  • @prog ram: SpringBoot_RabbitMQ_Advanced

  • @description: 測試確認模式訊息是否傳送成功

  • @author: 魏一鶴

  • @createDate: 2022-04-04 23:10

**/

//spring配置檔案

@RunWith(SpringJUnit4ClassRunner.class)

//載入檔案的路徑

@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml" )

public class ProducerTest {

//注入 RabbitTemplate

@Autowired

private RabbitTemplate rabbitTemplate;



/**
  • 確認模式:

  • 步驟

  • 1確認模式的開啟:在connectionFactory中開啟,預設是false不開啟的 publisher-confirms="true"

  • 2回撥函式的編寫:在RabbitTemplate模板工具類定義ConfirmCallBack(回撥函式).當訊息傳送出去的時候回撥函式會自動執行,返回true(成功)或者false(失敗)

**/

@Test

public void testConfirm() {

    //定義確認模式的回撥函式

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

        @Override

        //匿名內部類

/**

  • confirm有三個引數 下面一一說明

  • CorrelationData correlationData 相關的配置資訊

  • boolean ack exchange交換機是否成功收到了訊息 true成功false失敗

  • String cause 失敗原因 ack=false

**/

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            System.out.println( "確認模式的回撥函式被執行了!" ); //確認模式的回撥函式被執行了!

System.out.println( "訊息是否傳送成功?" +ack);

            if(ack){

                //交換機接收成功生產者傳送的訊息

System.out.println( "接收成功訊息!原因是:" +cause);

            }else{

                //交換機接收沒有成功生產者傳送的訊息

System.out.println( "接收失敗訊息!原因是:" +cause);

                //將來會做處理,就算訊息傳送失敗也會重新去傳送訊息,保證訊息第二次傳送成功

}

        }

    });

    //傳送訊息

rabbitTemplate.convertAndSend( "test_exchange_confirm" , "confirm" , "message confirm..." );

}





/**
  • 回退模式:當訊息傳送給Exchange交換機後,交換機路由到queue失敗時才會執行ReturnCallBack

  • 步驟

  • 1回退模式的開啟:在connectionFactory中開啟,預設是false不開啟的 publisher-returns="true"

  • 2回撥函式的編寫:在RabbitTemplate模板工具類定義ConfirmCallBack(回撥函式).當訊息傳送出去的時候回撥函式會自動執行,返回true(成功)或者false(失敗)

  • 3設定Exchange處理訊息的模式 它有兩種模式

  • 3.1 第一種模式:如果訊息沒有路由到queue佇列,則會丟棄訊息(預設的方式)

  • 3.2 第二種模式:如果訊息沒有路由到queue佇列,則返回給訊息的傳送方ReturnCallBack

**/

@Test

public void testReturn() {



    //由於處理訊息的模式預設是如果訊息沒有路由到queue佇列,則會丟棄訊息

//所以需要設定交換機處理訊息的模式,交換機會把訊息返回給對應的生產者,生產者通過監聽就能拿到訊息

rabbitTemplate.setMandatory(true);

    //編寫returnCallBack回撥函式

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){

        @Override

        //匿名內部類

/**

  • returnedMessage有五個引數 下面一一說明

  • Message message 訊息物件

  • int replyCode 返回編碼 錯誤碼

  • String replyText 錯誤資訊

  • String exchange 交換機

  • String routingKey 路由鍵

**/

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

            System.out.println( "返回模式的回撥函式被執行了!" );

            System.out.println( "訊息物件:" +message);

            System.out.println( "返回編碼(錯誤碼):" +replyCode);

            System.out.println( "錯誤資訊:" +replyText);

            System.out.println( "交換機:" +exchange);

            System.out.println( "路由鍵:" +routingKey);

            //將來會做處理 把資訊重新路由

//-----------------------------------------列印資訊

//返回模式的回撥函式被執行了!

//訊息物件:(Body:'message confirm...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

//返回編碼(錯誤碼):312

//錯誤資訊:NO_ROUTE

//交換機:test_exchange_confirm

//路由鍵:confirm111

}

    });

    //傳送訊息

rabbitTemplate.convertAndSend( "test_exchange_confirm" , "confirm" , "message confirm..." );

}



//專門測試限流迴圈傳送訊息

@Test

public void testSend() {

    for (int i = 0; i < 10; i++) {

        //傳送訊息

rabbitTemplate.convertAndSend( "test_exchange_confirm" , "confirm" , "message confirm..." );

    }

}



//測試ttl

/**

  • ttl 過期時間分為兩種

  • 1佇列統一過期

  • 2訊息單獨過期

  • 如果設定了訊息的過期時間,也設定了佇列的過期時間,它以時間短的為準!

  • 如果佇列過期後,會將佇列所有訊息全部移除,因為是統一的

  • 訊息過期後,只有訊息在佇列頂端(快要被消費),才會判斷其是否過期,如果過期就會被移除掉

**/

@Test

public void testTTL() {

    //訊息單獨過期 在convertAndSend方法中心新增一個引數 MessagePostProcessor 載入匿名內部類和它的重寫方法

//MessagePostProcessor 訊息處理物件 設定一些訊息引數資訊

//傳送訊息

for (int i = 0; i < 10; i++) {

        if(i==5){

            //過期的訊息

rabbitTemplate.convertAndSend( "test_exchange_ttl" , "ttl.weiyihe" , "message ttl..." ,new MessagePostProcessor(){

                @Override

                public Message postProcessMessage(Message message) throws AmqpException {

                    //設定message的資訊  setExpiration 設定訊息過期時間

message.getMessageProperties().setExpiration( "10000" );

                    //返回該訊息

return message;

                }

            });

        }else{

            //不過期的訊息

rabbitTemplate.convertAndSend( "test_exchange_ttl" , "ttl.weiyihe" , "message ttl..." );

        }

    }

}





/**
  • 傳送測試私信訊息

  • 1、訊息過期時間的測試

  • 2、佇列長度限制的測試

  • 3、消費者訊息拒收的限制

**/

@Test

public void testDlx(){

    //1、測試過期時間的死信 給正常的交換機發送訊息

//如果訊息正常傳送到正常的交換機,過了10s會自動去死信佇列

//傳送訊息

//rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一條訊息,我會死嗎?");

//2、佇列長度限制的測試

//傳送20條資訊 現在x-max-length是10

//如果傳送成功 那麼因為設定了最大長度是10,只會有10條進行正常佇列

// 剩下的會跑到死信佇列,過了10s後正常佇列中的訊息也會自動跑到死信佇列中

//for (int i = 0; i < 20; i++) {

// //傳送訊息

// rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一條訊息,我會死嗎?");

//}

//3 消費者訊息拒收的限制

//消費者Consumer監聽正常的佇列,然後讓訊息拒絕接收並且不重回佇列

rabbitTemplate.convertAndSend( "test_exchange_dlx" , "test.dlx.haha" , "我是一條訊息,我會死嗎?" );

}

} ```

消費者拒收訊息

``` package com.wyh.listener;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**

  • @program: SpringBoot_RabbitMQ_Advanced

  • @description: RabbitMQ 死信通訊

  • @author: 魏一鶴

  • @createDate: 2022-04-06 20:30

**/

//包掃描註解 把bean載入到spring容器中

@Component

//實現MessageListener介面並重寫onMessage方法

public class DLXCKListener implements ChannelAwareMessageListener {

@Override

public void onMessage(Message message, Channel channel) throws Exception {

    long deliveryTag = message.getMessageProperties().getDeliveryTag();

    try {

        //1 接收並列印消費者收到(消費)的訊息

System.out.println(new String(message.getBody()));

        //2 處理業務邏輯

System.out.println( "處理業務邏輯......" );

        //故意使得業務報錯

int num=3/0;

        channel.basicAck(deliveryTag,true);

    } catch (Exception e) {

        System.out.println( "出現異常,消費者拒絕簽收!" );

        //死信佇列 拒絕簽收requeue=false 將訊息路由到死信佇列中

channel.basicNack(deliveryTag,true,false);

    }

}

} ```

DLX小結

1 死信交換機(佇列)和普通交換機(佇列)沒什麼區別

2 當訊息成為死信後,如果該佇列綁定了死信佇列,那麼該訊息就會被死信交換機路由到死信佇列,如果沒有繫結死信佇列,那麼訊息就會根據訊息過期時間丟失,就不會被消費

成為死信佇列的三種情況

1 佇列訊息長度達到最大限制

2 訊息者拒收訊息,並且不允許重回路由

3 原佇列設定過期時間,訊息到達超時時間未被消費