深入解析Apache Pulsar系列(二) —— Broker消息確認的管理

語言: CN / TW / HK

作者簡介

林琳 騰訊雲中間件專家工程師 Apache Pulsar PMC,《深入解析Apache Pulsar》作者。目前專注於中間件領域,在消息隊列和微服務方向具有豐富的經驗。負責TDMQ的設計與開發工作,目前致力於打造穩定、高效和可擴展的基礎組件與服務。

導語

我們在之前的《深入解析Apache Pulsar系列之一 —— 客户端消息確認》中介紹過Apache Pulsar客户端的多種消息確認模式。這篇文章中,我們將介紹Broker側對於消息確認的管理。

客户端通過消息確認機制通知Broker某些消息已經被消費,後續不要再重複推送。Broker側則使用遊標來存儲當前訂閲的消費位置信息,包含了消費位置中的所有元數據,避免Broker重啟後,消費者要從頭消費的問題。Pulsar中的訂閲分為持久訂閲和非持久訂閲,他們之間的區別是:持久訂閲的遊標(Cursor)是持久化的,元數據會保存在ZooKeeper,而非持久化遊標只保存在Broker的內存中。

遊標的簡介

Pulsar中每個訂閲都會包含一個遊標,如果多個消費者擁有相同的訂閲名(消費組),那這些消費者們會共享一個遊標。遊標的共享又和消費者的消費模式有關,如果是Exclusive或者FailOver模式的訂閲,那同一時間只有一個消費者使用這個遊標。如果是Shared或者Key_Shared模式的訂閲,那多個消費者會同時使用這個遊標。

每當消費者Ack一條消息,遊標中指針的位置都有可能會變化,為什麼説是有可能呢?這涉及到我們在客户端章節介紹的Acknowledge的方式:單條消息確認(Acknowledge)、批消息中的單個消息確認(Acknowledge)、累積消息確認(AcknowledgeCumulative)。否定應答(NegativeAcknowledge)不涉及遊標的變化,因此不在討論範圍之內。 我們先看單條消息的確認,如果是獨佔式的消費,每確認一條消息,遊標位置都會往後移動一個Entry,如下圖所示:

累積消息確認,只需要確認一條消息,遊標可以往後移動多個Entry,如:Consumer-1累積確認了Entry-4,則從0開始的Entry都會被確認,如下圖所示:

對於共享式的消費,因為有多個消費者同時消費消息,因此消息的確認可能會出現空洞,空洞如下圖所示:

這裏也解釋了為什麼MarkeDeletePosition指針的位置可能發生變化,我們可以從共享式的消費中看到,消息確認是可能出現空洞的,只有當前面所有的Entry都被消費並確認,MarkeDeletePosition指針才會移動。如果存在空洞,MarkeDeletePosition指針是不會往後移動的。那這個MarkeDeletePosition指針和遊標是什麼關係呢?遊標是一個對象,裏面包含了多個屬性,MarkeDeletePosition指針只是遊標的其中一個屬性。正如上面所説的Ack空洞,在遊標中有另外專門的方式進行存儲。如果我們不單獨存儲空洞,那Broker重啟後,消費者只能從MarkDeletePosition開始消費,會存在重複消費的問題。如果以上圖為例,Broker重啟後Entry-4就會被重複消費。當然,Pulsar中對空洞信息是有單獨存儲的。

然後,我們看看遊標裏到底記錄了什麼元數據,此處只列出一些關鍵的屬性:

屬性名 描述
bookkeeper Bookkeeper Client的引用,主要用來打開Ledger,例如:讀取歷史數據,可以打開已經關閉的Ledger;當前Ledger已經寫滿,打開一個新的Ledger
markDeletePosition 標記可刪除的位置,在這個位置之前的所有Entry都已經被確認了,因此這個位置之前的消息都是可刪除狀態
persistentMarkDeletePosition markDeletePosition是異步持久化的,這個屬性記錄了當前已經持久化的markDeletePosition。當markDeletePosition不可用時,會以這個位置為準。這個位置會在遊標recovery時初始化,後續在持久化成功後不斷更新
readPosition 訂閲當前讀的位置,即使有多個消費者,讀的位置肯定是嚴格有序的,只不過消息會分給不同的消費者而已。讀取的位置會在遊標恢復(recovery)時初始化,消費時會不斷更新
lastMarkDeleteEntry 最後被標記為刪除的Entry,即markDeletePosition指向的Entry
cursorLedger cursor在Zookeeper中只會保存索引信息,具體的Ack數據會比較大,因此會保存到Bookkeeper中,這個屬性持有了對應Ledger的引用
individualDeletedMessages 用於保存Ack的空洞信息
batchDeletedIndexes 用於保存批量消息中單條消息Ack信息

看到CursorLedger,説明數據被保存到了Bookkeeper中。有的讀者可能會有疑問,既然數據都保存到Bookkeeper中了,那ZooKeeper中保存的Cursor信息有什麼用呢?我們可以認為ZooKeeper中保存的遊標信息只是索引,包含了以下幾個屬性:

  • 當前的cursorLedger名以及ID,用於打開Bookkeeper中的Ledger;
  • LastMarkDeleteEntry,最後被標記為刪除的Entry信息,裏面包含了LedgerId和EntryId;
  • 遊標最後的活動時間戳。

遊標保存到ZooKeeper的時機有幾個:

  • 當cursor被關閉時;
  • 當發生Ledger切換導致cursorLedger變化時;
  • 當持久化空洞數據到Bookkeeper失敗並嘗試持久化空洞數據到ZooKeeper時。

我們可以把ZooKeeper中的遊標信息看作Check Point,當恢復數據時,會先從ZooKeeper中恢復元數據,獲取到Bookkeeper Ledger信息,然後再通過Ledger恢復最新的LastMarkDeleteEntry位置和空洞信息。

既然遊標不會實時往ZooKeeper中寫入數據,那是如何保證消費位置不丟失的呢?

Bookkeeper中的一個Ledger能寫很多的Entry,因此高頻的保存操作都由Bookkeeper來承擔了,ZooKeeper只負責存儲低頻的索引更新。

消息空洞的管理

在遊標對象中,使用了一個IndividualDeletedMessages容器來存儲所有的空洞信息。得益於Java中豐富的輪子生態,Broker中直接使用了Guava Range這個庫來實現空洞的存儲。舉個例子,假設在Ledger-1中我們的空洞如下:

則我們存儲的空洞信息如下,即會用區間來表示已經連續Ack的範圍: [ (1:-1, 1:2] , (1:3, 1:6] ]

使用區間的好處是,可以用很少的區間數來表示整個Ledger的空洞情況,而不需要每個Entry都記錄。當某個範圍都已經被消費且確認了,會出現兩個區間合併為一個區間,這都是Guava Range自動支持的能力。如果從當前MarkDeletePosition指針的位置到後面某個Entry為止,都連成了一個區間,則MarkDeletePosition指針就可以往後移動了。

記錄了這些消息空洞之後,是如何用來避免消息重複消費的呢?

當Broker從Ledger中讀取到消息後,會進入一個清洗階段,如:過濾掉延遲消息等等。在這個階段,Broker會遍歷所有消息,看消息是否存在於Range裏,如果存在,則説明已經被確認過了,這條消息會被過濾掉,不再推送給客户端。Guava Range提供了Contains接口,可以快速查看某個位置是否落在區間裏。這種Entry需要被過濾的場景,基本上只會出現在Broker重啟後,此時遊標信息剛恢復。當ReadPosition超過了這段空洞的位置時,就不會出現讀到重複消息要被過濾的情況了。

然後,我們來看看IndividualDeletedMessages這個容器的實現。

IndividualDeletedMessages 的類型是LongPairRangeSet,默認實現是DefaultRangeSet,是一個基於Google Guava Range包裝的實現類。另外一個Pulsar自己實現的優化版:ConcurrentOpenLongPairRangeSet。優化版的RangeSet和Guava Range的存儲方式有些不一樣,Guava Range使用區間來記錄數據,優化版RangeSet對外提供的接口也是Range,但是內部使用了BitSet來記錄每個Entry是否被確認。

優化版RangeSet在空洞較多的情況下對內存更加友好。我們可以假設一個場景,有100W的消息被拉取,但是隻有50W的消息已經被Ack,並且每隔一條消息Ack一條,這樣就會出現50W個空洞。此時的Range就無法發揮區間的優勢了,會出現50W個Range對象,如下圖所示。而優化版的RangeSet使用了BitSet,每個ack只佔一位。

我們可以在broker.conf中,通過配置項managedLedgerUnackedRangesOpenCacheSetEnabled=true來開啟使用優化版的RangeSet。

也正因如此,如果整個集羣的訂閲數比較多,遊標對象的數據量其實並不小。所以在Pulsar中,MetaDataStore中只保存了遊標的索引信息,即保存了遊標存儲在哪個Ledger中。真正的遊標數據會通過上面介紹的cursorLedger寫入到Bookkeeper中持久化。整個遊標對象會被寫入到一個Entry中,其Protobuf的定義如下:

message PositionInfo { required int64 ledgerId = 1; required int64 entryId = 2; repeated MessageRange individualDeletedMessages = 3; repeated LongProperty properties = 4; repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5; }

看到這裏,其實Batch消息中單條消息確認的實現也清晰了,BatchDeletedIndexes是一個ConcurrentSkipListMap,Key為一個Position對象,對象裏面包含了LedgerId和EntryId。Value是一個BitSet,記錄了這個Batch裏面哪些消息已經被確認。BatchDeletedIndexes會和單條消息的空洞一起放在同一個對象(PositionInfo)中,最後持久化到Bookkeeper。

空洞數據如果寫入Bookkeeper失敗了,現在Pulsar還會嘗試往ZooKeeper中保存,和索引信息一起保存。但是ZooKeeper不會保存所有的數據,只會保存一小部分,儘可能的讓客户端不出現重複消費。我們可以通過broker.conf中的配置項來決定最多持久化多少數據到ZooKeeper,配置項名為:managedLedgerMaxUnackedRangesToPersistInZooKeeper,默認值是1000。

消息空洞管理的優化

空洞存儲的方案看起來已經很完美,但是在海量未確認消息的場景下還是會出現一些問題。首先是大量的訂閲會讓遊標數量暴增,導致Broker內存的佔用過大。其次,有很多空洞其實是根本沒有變化的,現在每次都要保存全量的空洞數據。最後,雖然優化版RangeSet在內存中使用了BitSet來存儲,但是實際存儲在Bookkeeper中的數據MessageRange,還是一個個由LedgerId和EntryId組成的對象,每個MessageRange佔用16字節。當空洞數量比較多時,總體體積會超過5MB,而現在Bookkeeper能寫入的單個Entry大小上限是5MB,如果超過這個閾值就會出現空洞信息持久化失敗的情況。

對於這種情況,已經有專門的PIP在解決這個問題,筆者在寫這篇文章的時候,這個PIP代碼已經提交,正在Review階段,因此下面的內容可能會和最終代碼有一定差距。

新的方案中主要使用LRU+分段存儲的方式來解決上述問題。由於遊標中空洞信息數據量可能會很大,因此內存中只保存少量熱點區間,通過LRU算法來切換冷熱數據,從而進一步壓縮內存的使用率。分段存儲主要是把空洞信息存儲到不同的Entry中去,這樣能避免超過一個Entry最大消息5MB的限制。

如果我們把空洞信息拆分為多個Entry來存儲,首先面臨的問題是索引。因為使用單個Entry記錄時,只需要讀取Ledger中最後一個Entry即可,而拆分為多個Entry後,我們不知道要讀取多少個Entry。因此,新方案中引入了Marker,如下圖所示:

當所有的Entry保存完成後,插入一個Marker,Marker是一個特殊的Entry,記錄了當前所有拆分存儲的Entry。當數據恢復時,從後往前讀,先讀出索引,然後再根據索引讀取所有的Entry。

由於存儲涉及到多個Entry,因此需要保證原子性,只要最後一個Entry讀出來不是Marker,則説明上次的保存沒有完成就中斷了,會繼續往前讀,直到找到一個完整的Marker。

空洞信息的存儲,也不需要每次全量了。以Ledger為單位,記錄每個Ledger下的數據是否有修改過,如果空洞數據被修改過會被標識為髒數據,存儲時只會保存有髒數據的部分,然後修改Marker中的索引。

假設Entry-2中存儲的空洞信息有修改,則Entry-2會被標記為髒數據,下次存儲時,只需要存儲一個Entry-2,再存儲一個Marker即可。只有當整個Ledger寫滿的情況下,才會觸發Marker中所有Entry複製到新Ledger的情況。如下圖所示:

ManagedLedger在內存中通過LinkedHashMap實現了一個LRU鏈表,會有線程定時檢查空洞信息的內存佔用是否已經達到閾值,如果達到了閾值則需要進行LRU換出,切換以Ledger為單位,把最少使用的數據從Map中移除。LRU數據的換入是同步的,當添加或者調用Contains時,發現Marker中存在這個Ledger的索引,但是內存中沒有對應的數據,則會觸發同步數據的加載。異步換出和同步換入,主要是為了讓數據儘量在內存中多待一會,避免出現頻繁的換入換出。

尾聲

Pulsar中的設計細節非常多,由於篇幅有限,作者會整理一系列的文章進行技術分享,敬請期待。如果各位希望系統性地學習Pulsar,可以購買作者出版的新書《深入解析Apache Pulsar》。