深入解析Apache Pulsar系列(一):客户端消息確認

語言: CN / TW / HK

解析 Apache Pulsar —— 客户端消息確認

作者介紹:

騰訊雲中間件專家工程師

Apache Pulsar PMC,《深入解析Apache Pulsar》作者。

目前專注於中間件領域,在消息隊列和微服務方向具有豐富的經驗。

負責 CKafka、TDMQ的設計與開發工作,目前致力於打造穩定、高效和可擴展的基礎組件與服務。

導語

在 Apache Pulsar 中,為了避免消息的重複投遞,消費者進行消息確認是非常重要的一步。當一條消息被消費者消費後,需要消費者發送一個Ack請求給Broker,Broker才會認為這條消息被真正消費掉。被標記為已經消費的消息,後續不會再次重複投遞給消費者。在這篇文章中,我們會介紹Pulsar中消息確認的模式,以及正常消息確認在Broker側是如何實現的。

1 確認消息的模式

在瞭解Pulsar消息確認模式之前,我們需要先了解一些前置知識 —— Pulsar中的訂閲以及遊標(Cursor)。Pulsar中有多種消費模式,如:Share、Key_share、Failover等等,無論用户使用哪種消費模式都會創建一個訂閲。訂閲分為持久訂閲和非持久訂閲,對於持久訂閲,Broker上會有一個持久化的Cursor,即Cursor的元數據被記錄在ZooKeeper。Cursor以訂閲(或稱為消費組)為單位,保存了當前訂閲已經消費到哪個位置了。因為不同消費者使用的訂閲模式不同,可以進行的ack行為也不一樣。總體來説可以分為以下幾種Ack場景:

(1)單條消息確認(Acknowledge)

和其他的一些消息系統不同,Pulsar支持一個Partition被多個消費者消費。假設消息1、2、3發送給了Consumer-A,消息4、5、6發送給了Consumer-B,而Consumer-B又消費的比較快,先Ack了消息4,此時Cursor中會單獨記錄消息4為已Ack狀態。如果其他消息都被消費,但沒有被Ack,並且兩個消費者都下線或Ack超時,則Broker會只推送消息1、2、3、5、6,已經被Ack的消息4不會被再次推送。

(2)累積消息確認(AcknowledgeCumulative)

假設Consumer接受到了消息1、2、3、4、5,為了提升Ack的性能,Consumer可以不分別Ack 5條消息,只需要調用AcknowledgeCumulative,然後把消息5傳入,Broker會把消息5以及之前的消息全部標記為已Ack。

(3)批消息中的單個消息確認(Acknowledge)

這種消息確認模式,調用的接口和單條消息的確認一樣,但是這個能力需要Broker開啟配置項AcknowledgmentAtBatchIndexLevelEnabled。當開啟後,Pulsar可以支持只Ack一個Batch裏面的某些消息。假設Consumer拿到了一個批消息,裏面有消息1、2、3,如果不開啟這個選項,我們只能消費整個Batch再Ack,否則Broker會以批為單位重新全部投遞一次。前面介紹的選項開啟之後,我們可以通過Acknowledge方法來確認批消息中的單條消息。

(4)否定應答(NegativeAcknowledge)

客户端發送一個RedeliverUnacknowledgedMessages命令給Broker,明確告知Broker,當前Consumer無法消費這條消息,消息將會被重新投遞。

並不是所有的訂閲模式下都能用上述這些ack行為,例如:Shared或者Key_shared模式下就不支持累積消息確認(AcknowledgeCumulative)。因為在Shared或者Key_Shared模式下,前面的消息不一定是被當前Consumer消費的,如果使用AcknowledgeCumulative,會把別人的消息也一起確認掉。訂閲模式與消息確認之間的關係如下所示:

訂閲模式 單條Ack 累積Ack 批量消息中單個Ack 否定Ack
Exclusive 支持 支持 支持 不支持
Shared 支持 不支持 支持 支持
Failover 支持 支持 支持 不支持
Key_Shared 支持 不支持 支持 支持

2 Acknowledge與AcknowledgeCumulative的實現

Acknowledge與AcknowledgeCumulative接口不會直接發送消息確認請求給Broker,而是把請求轉交給AcknowledgmentsGroupingTracker處理。這是我們要介紹的Consumer裏的第一個Tracker,它只是一個接口,接口下有兩個實現,一個是持久化訂閲的實現,另一個是非持久化訂閲的實現。由於非持久化訂閲的Tracker實現都是空,即不做任何操作,因此我們只介紹持久化訂閲的實現——PersistentAcknowledgmentsGroupingTracker。

在Pulsar中,為了保證消息確認的性能,並避免Broker接收到非常高併發的Ack請求,Tracker中默認支持批量確認,即使是單條消息的確認,也會先進入隊列,然後再一批發往Broker。我們在創建Consumer時可以設置參數AcknowledgementGroupTimeMicros,如果設置為0,則Consumer每次都會立即發送確認請求。所有的單條確認(individualAck)請求會先放入一個名為PendingIndividualAcks的Set,默認是每100ms或者堆積的確認請求超過1000,則發送一批確認請求。

消息確認的請求最終都是異步發送出去,如果Consumer設置了需要回執(Receipt),則會返回一個CompletableFuture,成功或失敗都能通過Future感知到。默認都是不需要回執的,此時直接返回一個已經完成的CompletableFuture。

對於Batch消息中的單條確認(IndividualBatchAck),用一個名為PendingIndividualBatchIndexAcks的Map進行保存,而不是普通單條消息的Set。這個Map的Key是Batch消息的MessageId,Value是一個BitSet,記錄這批消息裏哪些需要Ack。使用BitSet能大幅降低保存消息Id的能存佔用,1KB能記錄8192個消息是否被確認。由於BitSet保存的內容都是0和1,因此可以很方便地保存在堆外,BitSet對象也做了池化,可以循環使用,不需要每次都創建新的,對內存非常友好。

如下圖所示,只用了8位,就表示了Batch裏面8條消息的Ack情況,下圖表示EntryId為0、2、5、6、7的Entry都被確認了,確認的位置會被置為1:

對於累計確認(CumulativeAck)實現方式就更簡單了,Tracker中只保存最新的確認位置點即可。例如,現在Tracker中保存的CumulativeAck位置為5:10,代表該訂閲已經消費到LedgerId=5,EntryId=10的這條消息上了。後續又ack了一個5:20,則直接替換前面的5:10為5:20即可。

最後就是Tracker的Flush,所有的確認最終都需要通過觸發flush方法發送到Broker,無論是哪種確認,Flush時創建的都是同一個命令併發送給Broker,不過傳參中帶的AckType會不一樣。

3 NegativeAcknowledge的實現

否定應答和其他消息確認一樣,不會立即請求Broker,而是把請求轉交給NegativeAcksTracker進行處理。Tracker中記錄着每條消息以及需要延遲的時間。Tracker複用了PulsarClient的時間輪,默認是33ms左右一個時間刻度進行檢查,默認延遲時間是1分鐘,抽取出已經到期的消息並觸發重新投遞。Tracker主要存在的意義是為了合併請求。另外如果延遲時間還沒到,消息會暫存在內存,如果業務側有大量的消息需要延遲消費,還是建議使用ReconsumeLater接口。NegativeAck唯一的好處是,不需要每條消息都指定時間,可以全局設置延遲時間。

4 未確認消息的處理

如果消費者獲取到消息後一直不Ack會怎麼樣?這要分兩種情況,第一種是業務側已經調用了Receive方法,或者已經回調了正在異步等待的消費者,此時消息的引用會被保存進UnAckedMessageTracker,這是Consumer裏的第三個Tracker。UnAckedMessageTracker中維護了一個時間輪,時間輪的刻度根據AckTimeoutTickDurationInMs這兩個參數生成,每個刻度時間=AckTimeout / TickDurationInMs。新追蹤的消息會放入最後一個刻度,每次調度都會移除隊列頭第一個刻度,並新增一個刻度放入隊列尾,保證刻度總數不變。每次調度,隊列頭刻度裏的消息將會被清理,UnAckedMessageTracker會自動把這些消息做重投遞。

重投遞就是客户端發送一個RedeliverUnacknowledgedMessages命令給Broker。每一條推送給消費者但是未Ack的消息,在Broker側都會有一個集合來記錄(PengdingAck),這是用來避免重複投遞的。觸發重投遞後,Broker會把對應的消息從這個集合裏移除,然後這些消息就可以再次被消費了。注意,當重投遞時,如果消費者不是Share模式是無法重投遞單條消息的,只能把這個消費者所有已經接收但是未Ack的消息全部重新投遞。下圖是一個時間輪的簡單示例:

另外一種情況就是消費者做了預拉取,但是還沒調用過任何Receive方法,此時消息會一直堆積在本地隊列。預拉取是客户端SDK的默認行為,會預先拉取消息到本地,我們可以在創建消費者時通過ReceiveQueueSize參數來控制預拉取消息的數量。Broker側會把這些已經推送到Consumer本地的消息記錄為PendingAck,並且這些消息也不會再投遞給別的消費者,且不會Ack超時,除非當前Consumer被關閉,消息才會被重新投遞。Broker側有一個RedeliveryTracker接口,暫時的實現是內存追蹤(InMemoryRedeliveryTracker)。這個Tracker會記錄消息到底被重新投遞了多少次,每條消息推送給消費者時,會先從Tracker的哈希表中查詢一下重投遞的次數,和消息一併推送給消費者。

由上面的邏輯我們可以知道,創建消費者時設置的ReceiveQueueSize真的要慎重,避免大量的消息堆積在某一個Consumer的本地預拉取隊列,而其他Consumer又沒有消息可消費。PulsarClient上可以設置啟用ConsumerStatsRecorder,啟用後,消費者會在固定間隔會打印出當前消費者的metrics信息,例如:本地消息堆積量、接受的消息數等,方便業務排查性能問題。

尾聲

Pulsar中的設計細節非常多,由於篇幅有限,作者會整理一系列的文章進行技術分享,敬請期待。如果各位希望系統性地學習Pulsar,可以購買作者出版的新書《深入解析Apache Pulsar》。