Message deduplication 這裡的去重與你想的可能不一樣|Apache Pulsar 技術系列

語言: CN / TW / HK

導語

Apache Pulsar 是一個多租戶、高效能的服務間訊息傳輸解決方案,支援多租戶、低延時、讀寫分離、跨地域複製、快速擴容、靈活容錯等特性。騰訊雲內部 Pulsar工作組對 Pulsar 做了深入調研以及大量的效能和穩定性方面優化,目前已經在騰訊內部業務TDBank落地上線。本文是Pulsar技術系列中的一篇,主要介紹Pulsar 的 Message Deduplication 特性,供大家參考,避免在使用過程中踩坑。

Message Deduplication背景介紹

訊息中介軟體產品設計中,對訊息的投遞設計,一般參照Kafka中提出的三種投遞語意,分別為:

至多一次 (at-most-once)

至少一次 (at-least-once)

精確一次(或恰好一次) (exactly-once )

理解上需要注意的是,這裡都是對投遞行為的限定描述。

至多一次:客戶端在生產訊息的時候,僅會對生產的訊息投遞一次,這裡並不保證訊息一定生產成功。

至少一次:客戶端在生產訊息的時候,在收到一次成功的響應之前,可能會投遞多次。這種場景下,伺服器端可能存在多條重複的訊息。

精確一次(或恰好一次):客戶端在生產訊息的時候,針對這次生產,伺服器端保證有且僅儲存一份訊息。這裡的 “這次生產”,一般都是指的是客戶端對一次“SendMessage”的呼叫。這種語意下,伺服器一般不會處理多次對相同訊息體呼叫生產,產生重複訊息的場景。簡單而言,就是“精確一次”並不等於訊息去重複。

許多系統聲稱提供“exactly-once”的交付語義,但仔細閱讀其宣告會發現,一些系統的宣告可能存在一定的誤導性,我們需要考慮它們在生產超時,部分副本寫入成功,部分失敗等場景下對語意的保證。

目前業界,絕大多數的訊息中介軟體產品,如Kafka、RocketMQ、Pulsar、InLong-Tube、RabbitMQ、ActiveMQ等,都支援at-least-once(至少一次)的投遞語意,即生產成功的訊息,伺服器端至少能保證儲存一份,消費者至少能消費到一份訊息。但是,對exactly-once(精確一次)語意支援的產品還是比較少。

下面,我們著重介紹一下Pulsar的Message Deduplication(相當於對exactly-once的一種實現)功能,可能與你想的並不一樣。

Pulsar 的訊息去重(Message deduplication)

功能配置

Pulsar提供的Message Deduplication 功能,預設是關閉的。開啟時,需要修改Broker 端的配置,另外客戶端也需要新增少許的配置。 (詳情可參考pulsar的官網

開啟Message Deduplictiaon能力,首先,Broker 端需要變更如下配置:

`#是否開啟message deduplication功能

brokerDeduplicationEnabled#deduplication功能下,生產者的數量限制

brokerDeduplicationMaxNumberOfProducers

#broker端生成deduplication 快照資訊的間隔

brokerDeduplicationEntriesInterval

#生產者斷鏈後,broker端deduplication資訊儲存的時長

brokerDeduplicationProducerInactivityTimeoutMinutes`

其次,生產者客戶端需要做如下變更:

  1、為生產者指定一個名稱。

  2、配置訊息生產超時為0(預設為30s)。

程式碼示例如下:

`PulsarClient pulsarClient = PulsarClient.builder()

         .serviceUrl("pulsar://localhost:6650")
         .build();

Producer producer = pulsarClient.newProducer()

         .producerName("producer-1")
		 .topic("persistent://public/default/topic-1")
         .sendTimeout(0, TimeUnit.SECONDS)
         .create();`

功能原理

客戶端對每一個傳送的訊息請求,都會採用遞增方式生成一個唯一的Sequence ID編號,這個資訊會被放置在Message 的元資料中,傳輸到Broker端。同時,客戶端Producer 也會維護一個傳送的PendingMessages佇列,當收到Broker端返回的傳送Ack 資訊後,將PendingMessages中相同Sequence ID的資訊移除,客戶端認為傳送的這個訊息生產成功。 當Broker開啟Message Deduplication 功能後,Broker對對每個收到的訊息請求進行是否重複的判斷。

判斷的邏輯如下: 1、Broker端針對每個生產者,以生產者名字為key,分當前接收到的和已經處理完成的兩個維度儲存生產訊息的最大Sequence ID資訊:

/*當前已經接受不了到的*/ ConcurrentOpenHashMap<String, Long> highestSequencedPushed /*當前已經儲存處理過的*/ ConcurrentOpenHashMap<String, Long> highestSequencedPersisted

2、Broker端每收到一個生產Message的請求,會進行是否重複的判斷,即收到的最新的Sequence ID是否大於Broker 端儲存的兩維度下相同ProducerName下的Sequence ID,如果大於則不重複,如果小於或等於則訊息重複。訊息重複時,Broker端會直接返回,不會繼續走後續的儲存處理流程。

由上面Pulsar 的Message Depulication feature 相關的配置和實現原理的介紹。可知,Pulsar Broker端的Message Depulication 功能,並不是對訊息體的去重,而是客戶端在不配置超時時間的前提下,Broker 端在一定的時間範圍內,對同一個生產者名稱下的客戶端投遞的具有相同Sequence id的訊息的唯一行保證。

總結

Kafka 在0.11.0.0版本之後,針對Topic之內和多個Topic之間兩種場景下的exactly-once語意,分別提供了支援傳遞冪等性處理的選項和類事物訊息的處理方式進行保證。有興趣的同學可以參展kafka的原始碼和官網介紹

Pulsar的Message Deduplication feature與Kafka的單Topic下對exaxtly-once語意的保證在實現方式上類似,也可以認為是對exaxtly-once語意的一種實現。

這裡需要著重注意的是,exaxtly-once不等於訊息去重。在實際的開發中,生產和消費部分都有可能產生重複的訊息。

訊息的生產者,在收到明確的訊息生產成功的確認之前,訊息在伺服器端的儲存狀態是不確定的。

例如,在一定時間內,生產者沒有收到生產的響應,選擇了重發,這時,伺服器端就可能有兩份甚至多份訊息的副本。

此外,消費部分在如下幾個場景也有可能獲取到重複推送的訊息:

  1、消費者重啟時,已經消費,但是Broker端未收到Ack或消費者沒有觸發Ack;

  2、Broker重啟,因為消費者的Ack資訊並不是實時儲存的,Broker重啟後可能會有少量的已經消費的訊息會被重複推送;

  3、消費出現異常,客戶端使用reconsumerLater或negativeAck方式進行確認,這時Broker會重新推送訊息。

因此,大家在選用訊息中介軟體的特性時,需要注意相關的場景和限制。避免因為重複訊息對業務產生不必要的影響。

one more thing

騰訊雲基於 Apache Pulsar 自研的訊息中介軟體--TDMQ Pulsar 版,具備極好的雲原生和 Serverless 特性,相容 Pulsar 的各個元件與概念,具備計算儲存分離,靈活擴縮容的底層優勢。目前TDMQ Pulsar 版已開始商業化,對Pulsar感興趣的使用者可以進入官網瞭解詳情。