.Net Core&RabbitMQ訊息轉發可靠機制

語言: CN / TW / HK

前言

生產者傳送訊息到了佇列,佇列推送資料給了消費者,這裡存在一些問題需要思考下

  • 生產者如何確保訊息一定投遞到了佇列中

  • RabbitMQ 丟失了訊息(下文暫不涉及這塊)

  • 佇列如何確保消費者收到了訊息呢

生產者可靠傳送

執行流程

當生產者將訊息傳送出去後,如果不進行特殊設定,預設情況下,傳送訊息操作後是沒有返回任何訊息給生產者的,這時生產者是不知道訊息有沒有真的到達了RabbitMQ,訊息可能在到達RabbitMQ前已經丟了。

對於這種情況,有兩種解決方案來應對,

  • 事務機制,該機制是AMQP協議層面提供的解決方案

  • 傳送方確認機制,這是RabbitMQ提供的解決方案

事務機制

可以將當前使用的channel設定成事務模式。藉助如下與事務機制相關的三個方法來完成事務操作。

  • channel.TxSelect() 啟動事務模式

  • channel.TxCommit() 提交事務

  • channel.TxRollback() 回滾事務

TXCommit

事務模式下,時序圖中釋出訊息前後增加了一點動作,設定通道為事務模式與提交事務。如果TXCommit()呼叫成功,則說明事務提交成功,訊息一定到達了RabbitMQ中。

TXRollback

當事務提交執行之前,由於RabbitMQ異常崩潰或其他原因丟擲異常,則可以捕獲異常,再執行TXRollback(),完成事務回滾。

生產者程式碼

在原有Basic程式碼基礎上增加事務操作,此處先註釋掉與事務有關的程式碼(16、34和40行),並且在傳送訊息後設置異常模擬

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld_tx";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        //channel.TxSelect();        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            try
            {
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                if (message.Trim().ToLower() == "9527")
                {
                    throw new Exception("觸發異常");
                }
                //channel.TxCommit();                Console.WriteLine("訊息內容傳送完畢:" + message);
            }
            catch (Exception ex)
            {
                Console.WriteLine("訊息傳送出現異常:" + ex.Message);
                //channel.TxRollback();            }
        }
    }
}

消費端程式碼保持不變(僅改動下佇列名稱)

因生產者已經將訊息傳送到了佇列中,後觸發了異常,實則已經和訊息能不能到達佇列無關了,在消費端也就正常消費了。

取消16、34和40行的註釋後,在事務模式下,當再次觸發異常後,能夠看見,消費端並沒有收到訊息(這不是說消費端那邊出現異常了,而是訊息沒有到達佇列中)。傳送訊息與其他操作在同一事務下提供了一致性。

侷限性

事務模式下,只有收到了Broker的TX.Commit-OK才算提交成功,否則便可以捕獲異常執行事務回滾,所以能夠解決生產者和RabbitMQ之間訊息確認的問題。但是使用事務機制下有一個缺點,在一條訊息傳送之後會使得傳送方阻塞,以等待RabbitMQ的迴應,之後才能傳送下一條訊息,嚴重降低RabbitMQ的效能與吞吐量,一般不使用。

確認機制

生產者將channel設定為確認模式(confirm),在該通道上傳送的訊息都會分配一個唯一Id(從1開始),當訊息進入到佇列中後,RabbitMQ會發送一個ack( Ack nowledgement)命令(Basic.Ack)給生產者,這樣確保生產者知道這條訊息是真的到了。

相比於事務機制下的阻塞,傳送方確認機制下是非同步的,釋出一條訊息後,可以等待通道確認的同時傳送下一條訊息,當通道收到確認時,生產者可以通過回撥方法來處理該訊息。當RabbitMQ自身內部錯誤導致訊息丟失,會發送nack( N egative  Ack nowledgement)命令(Basic.Nack),生產者可以在回撥方法中處理該命令,諸如重試,記錄等操作,所有訊息在確認模式下,都會被ack或是nack一次。

核心方法

  • channel.ConfirmSelect 設定通道為確認模式(Confirm),收到Confirm.Select-OK命令表示同意將當前通道設定為Confirm模式。

  • channel.WaitForConfirms 通道等待Broker回執確認資訊

三種模式

生產者實現confirm有三種方式,取決於confirm的時機

  • 同步確認模式

    • 單條確認模式:傳送完訊息後等待確認

    • 批量確認模式:傳送完一批訊息後等待確認

  • 非同步確認模式:傳送完訊息後,提供一個回撥方法,Broker回執了一條或多條後由生產者回調執行,非同步等待確認

單條確認模式

生產者通過呼叫channel.ConfirmSelect()將通道設定為確認模式(Confirm)。傳送訊息到RabbitMQ,當訊息進入到佇列後,傳送Basic.Ack給生產者,通道等待獲取確認資訊,執行回撥邏輯。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld_producerack_singleconfirm";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.ConfirmSelect();        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }
        
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
            Console.WriteLine(channel.WaitForConfirms(new TimeSpan(0, 0, 1)) ? "訊息傳送成功" : "訊息傳送失敗");
        }
    }
}

訊息傳送完畢,同步等待獲取到Broker的回執確認訊息,這樣來判斷是否ack或是nack了,如此可確定生產者訊息是否真的到達了Broker。

批量確認模式

通道開啟confirm模式,與單條傳送後確認不同,批量傳送多條之後再呼叫waitForConfirms確認,這樣傳送多條之後才會等待一次確認訊息,極大提升confirm效率,但問題在於出現返回basic.nack或者超時的情況時,生產者需要將這批訊息全部重發,則會帶來明顯的重複訊息數量,並且當訊息經常丟失時,批量confirm效能應該是不升反降的。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld_producerack_batchconfirm";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.ConfirmSelect();

        while (true)
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine("訊息內容(exit退出):");
                var message = Console.ReadLine();
                if (message.Trim().ToLower() == "exit")
                {
                    break;
                }

                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                Console.WriteLine("訊息內容傳送完畢:" + message);
            }

            Console.WriteLine(channel.WaitForConfirms() ? "所有訊息內容傳送完畢" : "存在訊息傳送失敗");
        }
    }
}

此處對於批量,我設定三個為一組,三個執行完畢後,獲得Broker的回執確認訊息。

非同步確認模式

非同步模式下,額外增加回調方法來處理,Channel物件提供的BasicAcks,BasicNacks兩個回撥事件

  //  // Summary:  //     Signalled when a Basic.Nack command arrives from the broker.  event EventHandler<BasicNackEventArgs> BasicNacks;
  //  // Summary:  //     Signalled when a Basic.Ack command arrives from the broker.  event EventHandler<BasicAckEventArgs> BasicAcks;

BasicAckEventArg和BasicNackEventArgs都包含deliveryTag(當前Chanel在傳送訊息時給的訊息序號)。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld_producerack_asyncconfirm";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.ConfirmSelect();

        channel.BasicAcks += new EventHandler<BasicAckEventArgs>((o, basic) =>
        {
            Console.WriteLine($"呼叫ack回撥方法: DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");
        });
        channel.BasicNacks += new EventHandler<BasicNackEventArgs>((o, basic) =>
        {
            Console.WriteLine($"呼叫Nacks回撥方法; DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");
        });

        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

生產者傳送訊息,生產者收到回執確認訊息,呼叫回撥方法

在此基礎上,我們可以為channel維護一個unconfirm的訊息序號集合,用來控制nack回撥下的失敗重試。具體實現為

  • 當釋出一條訊息時,從channel獲取當前的唯一的Id存入集合中。

  • 當回撥一次BasicAcks事件方法時,unconfirm集合刪掉相應的一條或多條記錄。

// ...channel.ConfirmSelect();

var _pendingDeliveryTags = new LinkedList<ulong>();
channel.BasicAcks += new EventHandler<BasicAckEventArgs>((o, basic) =>
{
    Console.WriteLine($"呼叫ack回撥方法: DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");
    if (_pendingDeliveryTags.Count > 0)
    {
        return;
    }

    if (!basic.Multiple)
    {
        _pendingDeliveryTags.Remove(basic.DeliveryTag);
        return;
    }

    while (_pendingDeliveryTags.First.Value < basic.DeliveryTag)
    {
        _pendingDeliveryTags.RemoveFirst();
    }

    if (_pendingDeliveryTags.First.Value == basic.DeliveryTag)
    {
        _pendingDeliveryTags.RemoveFirst();
    }
});
channel.BasicNacks += new EventHandler<BasicNackEventArgs>((o, basic) =>
{
    Console.WriteLine($"呼叫Nacks回撥方法; DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");    if (_pendingDeliveryTags.Count > 0)
    {        return;
    }    if (!basic.Multiple)
    {
        _pendingDeliveryTags.Remove(basic.DeliveryTag);        return;
    }    while (_pendingDeliveryTags.First.Value < basic.DeliveryTag)
    {
        _pendingDeliveryTags.RemoveFirst();
    }    if (_pendingDeliveryTags.First.Value == basic.DeliveryTag)
    {
        _pendingDeliveryTags.RemoveFirst();
    }

    //Todo 執行訊息重發});for (int i = 0; i < 5000; i++)
{
    var message = "第" + (i + 1) + "條訊息";
    var body = Encoding.UTF8.GetBytes(message);
    var nextSeqNo = channel.NextPublishSeqNo;
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
    _pendingDeliveryTags.AddLast(nextSeqNo);
    Console.WriteLine("訊息內容傳送完畢:" + message);
}

非同步模式下,當生產者持續向RabbitMQ傳送訊息,可能生產者端並不會收到100個ack訊息 ,也許收到1個,或者2個,或N個ack訊息,這些ack訊息的multiple都為true,ack訊息中的deliveryTag的值的含義也不是一樣的,當multiple為true,deliveryTag代表的值是一批的id,像如下圖中的4905,則代表是4904和4905兩個唯一Id值。而當multiple為false時,則deliveryTag就僅代表一個唯一的Id值。

從效能方面比對這幾種方式,批量和非同步確認更勝一籌。可從程式碼複雜方面來講,事務機制或是單條確認模式都簡單,非同步確認模式反而是更復雜。而當遇到批量訊息需要重發場景時,批量確認模式卻又會存在效能不升反降。因此,也無絕對的優劣,適合的才是最好的。

消費者可靠接收

生產者將訊息可靠傳遞到了RabbitMQ,RabbitMQ將訊息推送給消費者,消費者端如何確保訊息可靠接收了呢? RabbitMQ提供了訊息確認機制,消費者在訂閱佇列時,可以指定autoAck引數。

AutoAck引數

  • 當autoAck為false時,RabbitMQ會等待消費者顯示的回覆確認訊號後再從記憶體(或磁碟)中移除訊息(先標記再刪除)。

  • 當autoAck為true時,RabbitMQ會把訊息傳送出去的訊息自動確認再從記憶體(或磁碟)中刪除,消費者是否接收到了訊息不管。

RabbitMQ的Web管理平臺上可以看到當前佇列是否AutoAck。當AutoAck設定為false時,面板中展示Ack required為勾選。同時佇列中的訊息分成了兩部分:

  • 一部分是等待投遞給消費者的訊息數 Ready

  • 一部分是已經投遞給消費者,但是未收到消費者確認訊號的訊息數 Unacked

如果 RabbitMQ 伺服器端一直沒有收到消費者的確認訊號,並且消費此訊息的消費者已經斷開連線,則伺服器端會安排該訊息重新進入佇列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。

  • 採用訊息確認機制後,只要設定 autoAck 引數為 false,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為 RabbitMQ 會一直等待持有訊息直到消費者顯式呼叫 Basic.Ack 命令為止。

  • RabbitMQ 不會為未確認的訊息設定過期時間,判斷此訊息是否需要重新投遞給消費者的唯一依據是消費該訊息連線是否已經斷開,這個設定的原因是 RabbitMQ 允許消費者消費一條訊息的時間可以很久

核心方法

  • BasicAck 用於確認當前消

  • BasicNack 用於拒絕當前訊息,可以執行批量拒絕

  • BasicReject 用於拒絕當前訊息,僅拒絕一條訊息

手動確認

生產者端將100條訊息存入佇列中,可以通過Web面板看到已有Total/Ready都是100條。

消費者端設定消費能力Qos為5,同時channel.BasicConsume 設定autoAck為false

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld_consumerack";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.BasicQos(0, 5, false);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Thread.Sleep(10000);
            var message = ea.Body;
            Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        Console.ReadKey();
    }
}

執行程式碼,在web面板中可以開始看到待確認數為5,即RabbitMQ按照消費者最大負載能力推送訊息,同時等待消費者手動確認。

這樣,能夠可靠的確定消費者著實接收到了訊息。當消費者因處理業務發生異常下或是因網路不穩定、伺服器異常等,消費者沒有反饋Ack,這樣訊息一直會處在佇列中,這樣就確保了有其他消費者(也許還是自身)有機會執行重試,確保業務正常,僅僅當RabbitMQ收到消費者的Ack反饋後才會刪除。

自動確認

RabbitMQ訊息確認機制(ACK)預設是自動確認的,自動確認會在訊息傳送給消費者後立即確認,但存在丟失訊息的可能。

生產者傳送100條訊息到RabbitMQ中等待消費。

消費者端設定自動確認Ack,接收訊息中設定了當讀取到第50條訊息時丟擲異常。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    Thread.Sleep(1000);
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("異常");
        throw new Exception("模擬異常");
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

當執行程式,訊息全部會推送到消費者中(basicQos在AutoAck下無效),出現異常時,RabbitMQ中也不會留有訊息,這樣一來,訊息就丟失了。

這種不安全的模式,使用場景受限,訊息易丟失。同時,由於自動確認下,設定Qos無效,當訊息數量過多時,RabbitMQ傳送到消費者端的訊息太快,數量太多,又可能造成消費者端訊息擠壓而耗盡記憶體導致程序終止。

2022-08-23,望技術有成後能回來看見自己的腳步