面試官:引入RabbitMQ後,你如何保證全鏈路資料100%不丟失?

語言: CN / TW / HK

作者:指尖涼

來源:blog.csdn.net/hsz2568952354/article/details/86559470

我們都知道,訊息從生產端到消費端消費要經過3個步驟:

  1. 生產端傳送訊息到RabbitMQ;

  2. RabbitMQ傳送訊息到消費端;

  3. 消費端消費這條訊息;

這3個步驟中的每一步都有可能導致訊息丟失,訊息丟失不可怕,可怕的是丟失了我們還不知道,所以要有一些措施來保證系統的可靠性。這裡的可靠並不是一定就100%不丟失了,磁碟損壞,機房爆炸等等都能導致資料丟失,當然這種都是極小概率發生,能做到99.999999%訊息不丟失,就是可靠的了。下面來具體分析一下問題以及解決方案。

生產端可靠性投遞

生產端可靠性投遞,即生產端要確保將訊息正確投遞到RabbitMQ中。生產端投遞的訊息丟失的原因有很多,比如訊息在網路傳輸的過程中發生網路故障訊息丟失,或者訊息投遞到RabbitMQ時RabbitMQ掛了,那訊息也可能丟失,而我們根本不知道發生了什麼。針對以上情況,RabbitMQ本身提供了一些機制。

事務訊息機制

事務訊息機制由於會嚴重降低效能,所以一般不採用這種方法,我就不介紹了,而採用另一種輕量級的解決方案——confirm訊息確認機制。

confirm訊息確認機制

什麼是confirm訊息確認機制?顧名思義,就是生產端投遞的訊息一旦投遞到RabbitMQ後,RabbitMQ就會發送一個確認訊息給生產端,讓生產端知道我已經收到訊息了,否則這條訊息就可能已經丟失了,需要生產端重新發送訊息了。

通過下面這句程式碼來開啟確認模式:

channel.confirmSelect();// 開啟發送方確認模式

然後非同步監聽確認和未確認的訊息:

channel.addConfirmListener(new ConfirmListener() {
//訊息正確到達broker
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已收到訊息");
//做一些其他處理
}

//RabbitMQ因為自身內部錯誤導致訊息丟失,就會發送一條nack訊息
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未確認訊息,標識:" + deliveryTag);
//做一些其他處理,比如訊息重發等
}
});

這樣就可以讓生產端感知到訊息是否投遞到RabbitMQ中了,當然這樣還不夠,稍後我會說一下極端情況。

訊息持久化

那訊息持久化呢?我們知道,RabbitMQ收到訊息後將這個訊息暫時存在了記憶體中,那這就會有個問題,如果RabbitMQ掛了,那重啟後資料就丟失了,所以相關的資料應該持久化到硬碟中,這樣就算RabbitMQ重啟後也可以到硬碟中取資料恢復。那如何持久化呢?

message訊息到達RabbitMQ後先是到exchange交換機中,然後路由給queue佇列,最後傳送給消費端。

所有需要給exchange、queue和message都進行持久化:

exchange持久化:

//第三個引數true表示這個exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

queue持久化:

//第二個引數true表示這個queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message持久化:

//第三個引數MessageProperties.PERSISTENT_TEXT_PLAIN表示這條訊息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

這樣,如果RabbitMQ收到訊息後掛了,重啟後會自行恢復訊息。

到此,RabbitMQ提供的幾種機制都介紹完了,但這樣還不足以保證訊息可靠性投遞RabbitMQ中,上面我也提到了會有極端情況,比如RabbitMQ收到訊息還沒來得及將訊息持久化到硬碟時,RabbitMQ掛了,這樣訊息還是丟失了,或者RabbitMQ在傳送確認訊息給生產端的過程中,由於網路故障而導致生產端沒有收到確認訊息,這樣生產端就不知道RabbitMQ到底有沒有收到訊息,就不好做接下來的處理。

所以除了RabbitMQ提供的一些機制外,我們自己也要做一些訊息補償機制,以應對一些極端情況。接下來我就介紹其中的一種解決方案——訊息入庫。

訊息入庫

訊息入庫,顧名思義就是將要傳送的訊息儲存到資料庫中。

首先發送訊息前先將訊息儲存到資料庫中,有一個狀態欄位status=0,表示生產端將訊息傳送給了RabbitMQ但還沒收到確認;在生產端收到確認後將status設為1,表示RabbitMQ已收到訊息。這裡有可能會出現上面說的兩種情況,所以生產端這邊開一個定時器,定時檢索訊息表,將status=0並且超過固定時間後(可能訊息剛發出去還沒來得及確認這邊定時器剛好檢索到這條status=0的訊息,所以給個時間)還沒收到確認的訊息取出重發(第二種情況下這裡會造成訊息重複,消費者端要做冪等性),可能重發還會失敗,所以可以做一個最大重發次數,超過就做另外的處理。

這樣訊息就可以可靠性投遞到RabbitMQ中了,而生產端也可以感知到了。

消費端訊息不丟失

既然已經可以讓生產端100%可靠性投遞到RabbitMQ了,那接下來就改看看消費端的了,如何讓消費端不丟失訊息。

預設情況下,以下3種情況會導致訊息丟失:

  • 在RabbitMQ將訊息發出後,消費端還沒接收到訊息之前,發生網路故障,消費端與RabbitMQ斷開連線,此時訊息會丟失;

  • 在RabbitMQ將訊息發出後,消費端還沒接收到訊息之前,消費端掛了,此時訊息會丟失;

  • 消費端正確接收到訊息,但在處理訊息的過程中發生異常或宕機了,訊息也會丟失。

其實,上述3中情況導致訊息丟失歸根結底是因為RabbitMQ的自動ack機制,即預設RabbitMQ在訊息發出後就立即將這條訊息刪除,而不管消費端是否接收到,是否處理完,導致消費端訊息丟失時RabbitMQ自己又沒有這條訊息了。

所以就需要將自動ack機制改為手動ack機制。

消費端手動確認訊息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//接收到訊息,做處理
//手動確認
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
//出錯處理,這裡可以讓訊息重回佇列重新發送或直接丟棄訊息
}
};
//第二個引數autoAck設為false表示關閉自動確認機制,需手動確認
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

這樣,當autoAck引數置為false,對於RabbitMQ服務端而言,佇列中的訊息分成了兩個部分:一部分是等待投遞給消費端的訊息;一部分是已經投遞給消費端,但是還沒有收到消費端確認訊號的訊息。如果RabbitMQ一直沒有收到消費端的確認訊號,並且消費此訊息的消費端已經斷開連線或宕機(RabbitMQ會自己感知到),則RabbitMQ會安排該訊息重新進入佇列(放在佇列頭部),等待投遞給下一個消費者,當然也有能還是原來的那個消費端,當然消費端也需要確保冪等性。

好了,到此從生產端到RabbitMQ再到消費端的全鏈路,就可以保證資料的不丟失。

由於個人水平有限,有些地方可能理解錯了或理解不到位的,請大家多多指出!Thanks