RabbitMQ 入門系列:10、擴充套件內容:延時佇列:延時佇列外掛及其有限的適用場景(系列大結局)。

語言: CN / TW / HK

延遲佇列用於事件發生後間隔一段時間後需要做特定處理的場景,如:

1、電商支付系統中,使用者下單後N分鐘不支付,自動取消訂單。
2、使用者瀏覽商品長時間後還沒下單,後續推送相關產品和優惠券。
3、使用者註冊或修改生日後:生日簡訊推送等。
4、7天后的自動確認收貨等。。。
......

對於這類應用,其實用訊息佇列,對個要求要求,是合適的,但對整個系統應用而言,它是不靠譜的。

訊息佇列的核心應用,是保持記憶體的佇列,不斷的產生並不斷的消耗,最佳狀態的保持系統的穩定和流暢。

而延遲佇列的核心,是積壓訊息,還是大量積壓,這明顯與訊息佇列的設計就不符合。

下面來看看網上官方的延時佇列插入:

1、延時佇列外掛:下載解壓與啟用

1、下載地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2、解壓到RabbitMQ安裝目錄下的plugins目錄下

3、啟動:命令列執行以下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重啟RabbitMQ:

多了個x-delayed-message則為安裝成功。

2、延時佇列外掛:實現原理

3、延時佇列外掛:編碼實現

其實,佇列一直還是那個普通的佇列,只是多了一個交換機的型別是延時型別。

因此,原理就是把訊息,設定過期時間,發給延時交換機,它自己存著,到時間了,會轉發到佇列去。

程式碼如下:

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{

    //定義佇列
    channel.QueueDeclare("dead");
   
   //定義延時路由
    IDictionary<string, object> dic = new Dictionary<string, object>();
    dic.Add("x-delayed-type", "direct");
    channel.ExchangeDeclare("ex-dead", "x-delayed-message", arguments: dic);

    channel.QueueBind("dead", "ex-dead", "dead");

    var p1 = channel.CreateBasicProperties();
    IDictionary<string, object> header = new Dictionary<string, object>();
    header.Add("x-delay", 6000);
    p1.Headers = header;
    var p2 = channel.CreateBasicProperties();
    IDictionary<string, object> header2 = new Dictionary<string, object>();
    header2.Add("x-delay", 16000);
    p2.Headers = header2;
    channel.BasicPublish("ex-dead", "dead", false, p1, Encoding.UTF8.GetBytes("6秒就過期了1。"));
    channel.BasicPublish("ex-dead", "dead", false, p2, Encoding.UTF8.GetBytes("16秒就過期了2。"));

}

執行,等了16秒,終於等來了資料:

4、延時佇列外掛:適合應用場景

如果說,列信佇列的適合場景,是短時間的固定間隔時間。

那麼說,延時佇列外掛的適合場景,就是更進一步的短時間內的隨機時間。

劃重點的話,還是取決於積壓的資料量。

官方的吐槽說明,目前的設計,由於儲存資料庫設計不佳,該外掛推薦積壓資料在100萬條資料以下適合應用。

總結:

使用延時佇列,應該考量業務積壓的資料量,如果資料量小,那麼都不是問題。

如果資料量大,那麼建立佇列資料庫,按時間存檔要傳送的資料,定時掃描處理更合適。