深入解析Apache Pulsar系列(二) —— Broker訊息確認的管理

語言: CN / TW / HK

作者簡介

林琳 騰訊雲中間件專家工程師 Apache Pulsar PMC,《深入解析Apache Pulsar》作者。目前專注於中介軟體領域,在訊息佇列和微服務方向具有豐富的經驗。負責TDMQ的設計與開發工作,目前致力於打造穩定、高效和可擴充套件的基礎元件與服務。

導語

我們在之前的《深入解析Apache Pulsar系列之一 —— 客戶端訊息確認》中介紹過Apache Pulsar客戶端的多種訊息確認模式。這篇文章中,我們將介紹Broker側對於訊息確認的管理。

客戶端通過訊息確認機制通知Broker某些訊息已經被消費,後續不要再重複推送。Broker側則使用遊標來儲存當前訂閱的消費位置資訊,包含了消費位置中的所有元資料,避免Broker重啟後,消費者要從頭消費的問題。Pulsar中的訂閱分為持久訂閱和非持久訂閱,他們之間的區別是:持久訂閱的遊標(Cursor)是持久化的,元資料會儲存在ZooKeeper,而非持久化遊標只儲存在Broker的記憶體中。

遊標的簡介

Pulsar中每個訂閱都會包含一個遊標,如果多個消費者擁有相同的訂閱名(消費組),那這些消費者們會共享一個遊標。遊標的共享又和消費者的消費模式有關,如果是Exclusive或者FailOver模式的訂閱,那同一時間只有一個消費者使用這個遊標。如果是Shared或者Key_Shared模式的訂閱,那多個消費者會同時使用這個遊標。

每當消費者Ack一條訊息,遊標中指標的位置都有可能會變化,為什麼說是有可能呢?這涉及到我們在客戶端章節介紹的Acknowledge的方式:單條訊息確認(Acknowledge)、批訊息中的單個訊息確認(Acknowledge)、累積訊息確認(AcknowledgeCumulative)。否定應答(NegativeAcknowledge)不涉及遊標的變化,因此不在討論範圍之內。 我們先看單條訊息的確認,如果是獨佔式的消費,每確認一條訊息,遊標位置都會往後移動一個Entry,如下圖所示:

累積訊息確認,只需要確認一條訊息,遊標可以往後移動多個Entry,如:Consumer-1累積確認了Entry-4,則從0開始的Entry都會被確認,如下圖所示:

對於共享式的消費,因為有多個消費者同時消費訊息,因此訊息的確認可能會出現空洞,空洞如下圖所示:

這裡也解釋了為什麼MarkeDeletePosition指標的位置可能發生變化,我們可以從共享式的消費中看到,訊息確認是可能出現空洞的,只有當前面所有的Entry都被消費並確認,MarkeDeletePosition指標才會移動。如果存在空洞,MarkeDeletePosition指標是不會往後移動的。那這個MarkeDeletePosition指標和遊標是什麼關係呢?遊標是一個物件,裡面包含了多個屬性,MarkeDeletePosition指標只是遊標的其中一個屬性。正如上面所說的Ack空洞,在遊標中有另外專門的方式進行儲存。如果我們不單獨儲存空洞,那Broker重啟後,消費者只能從MarkDeletePosition開始消費,會存在重複消費的問題。如果以上圖為例,Broker重啟後Entry-4就會被重複消費。當然,Pulsar中對空洞資訊是有單獨儲存的。

然後,我們看看遊標裡到底記錄了什麼元資料,此處只列出一些關鍵的屬性:

屬性名 描述
bookkeeper Bookkeeper Client的引用,主要用來開啟Ledger,例如:讀取歷史資料,可以開啟已經關閉的Ledger;當前Ledger已經寫滿,開啟一個新的Ledger
markDeletePosition 標記可刪除的位置,在這個位置之前的所有Entry都已經被確認了,因此這個位置之前的訊息都是可刪除狀態
persistentMarkDeletePosition markDeletePosition是非同步持久化的,這個屬性記錄了當前已經持久化的markDeletePosition。當markDeletePosition不可用時,會以這個位置為準。這個位置會在遊標recovery時初始化,後續在持久化成功後不斷更新
readPosition 訂閱當前讀的位置,即使有多個消費者,讀的位置肯定是嚴格有序的,只不過訊息會分給不同的消費者而已。讀取的位置會在遊標恢復(recovery)時初始化,消費時會不斷更新
lastMarkDeleteEntry 最後被標記為刪除的Entry,即markDeletePosition指向的Entry
cursorLedger cursor在Zookeeper中只會儲存索引資訊,具體的Ack資料會比較大,因此會儲存到Bookkeeper中,這個屬性持有了對應Ledger的引用
individualDeletedMessages 用於儲存Ack的空洞資訊
batchDeletedIndexes 用於儲存批量訊息中單條訊息Ack資訊

看到CursorLedger,說明資料被儲存到了Bookkeeper中。有的讀者可能會有疑問,既然資料都儲存到Bookkeeper中了,那ZooKeeper中儲存的Cursor資訊有什麼用呢?我們可以認為ZooKeeper中儲存的遊標資訊只是索引,包含了以下幾個屬性:

  • 當前的cursorLedger名以及ID,用於開啟Bookkeeper中的Ledger;
  • LastMarkDeleteEntry,最後被標記為刪除的Entry資訊,裡面包含了LedgerId和EntryId;
  • 遊標最後的活動時間戳。

遊標儲存到ZooKeeper的時機有幾個:

  • 當cursor被關閉時;
  • 當發生Ledger切換導致cursorLedger變化時;
  • 當持久化空洞資料到Bookkeeper失敗並嘗試持久化空洞資料到ZooKeeper時。

我們可以把ZooKeeper中的遊標資訊看作Check Point,當恢復資料時,會先從ZooKeeper中恢復元資料,獲取到Bookkeeper Ledger資訊,然後再通過Ledger恢復最新的LastMarkDeleteEntry位置和空洞資訊。

既然遊標不會實時往ZooKeeper中寫入資料,那是如何保證消費位置不丟失的呢?

Bookkeeper中的一個Ledger能寫很多的Entry,因此高頻的儲存操作都由Bookkeeper來承擔了,ZooKeeper只負責儲存低頻的索引更新。

訊息空洞的管理

在遊標物件中,使用了一個IndividualDeletedMessages容器來儲存所有的空洞資訊。得益於Java中豐富的輪子生態,Broker中直接使用了Guava Range這個庫來實現空洞的儲存。舉個例子,假設在Ledger-1中我們的空洞如下:

則我們儲存的空洞資訊如下,即會用區間來表示已經連續Ack的範圍: [ (1:-1, 1:2] , (1:3, 1:6] ]

使用區間的好處是,可以用很少的區間數來表示整個Ledger的空洞情況,而不需要每個Entry都記錄。當某個範圍都已經被消費且確認了,會出現兩個區間合併為一個區間,這都是Guava Range自動支援的能力。如果從當前MarkDeletePosition指標的位置到後面某個Entry為止,都連成了一個區間,則MarkDeletePosition指標就可以往後移動了。

記錄了這些訊息空洞之後,是如何用來避免訊息重複消費的呢?

當Broker從Ledger中讀取到訊息後,會進入一個清洗階段,如:過濾掉延遲訊息等等。在這個階段,Broker會遍歷所有訊息,看訊息是否存在於Range裡,如果存在,則說明已經被確認過了,這條訊息會被過濾掉,不再推送給客戶端。Guava Range提供了Contains介面,可以快速檢視某個位置是否落在區間裡。這種Entry需要被過濾的場景,基本上只會出現在Broker重啟後,此時遊標資訊剛恢復。當ReadPosition超過了這段空洞的位置時,就不會出現讀到重複訊息要被過濾的情況了。

然後,我們來看看IndividualDeletedMessages這個容器的實現。

IndividualDeletedMessages 的型別是LongPairRangeSet,預設實現是DefaultRangeSet,是一個基於Google Guava Range包裝的實現類。另外一個Pulsar自己實現的優化版:ConcurrentOpenLongPairRangeSet。優化版的RangeSet和Guava Range的儲存方式有些不一樣,Guava Range使用區間來記錄資料,優化版RangeSet對外提供的介面也是Range,但是內部使用了BitSet來記錄每個Entry是否被確認。

優化版RangeSet在空洞較多的情況下對記憶體更加友好。我們可以假設一個場景,有100W的訊息被拉取,但是隻有50W的訊息已經被Ack,並且每隔一條訊息Ack一條,這樣就會出現50W個空洞。此時的Range就無法發揮區間的優勢了,會出現50W個Range物件,如下圖所示。而優化版的RangeSet使用了BitSet,每個ack只佔一位。

我們可以在broker.conf中,通過配置項managedLedgerUnackedRangesOpenCacheSetEnabled=true來開啟使用優化版的RangeSet。

也正因如此,如果整個叢集的訂閱數比較多,遊標物件的資料量其實並不小。所以在Pulsar中,MetaDataStore中只儲存了遊標的索引資訊,即儲存了遊標儲存在哪個Ledger中。真正的遊標資料會通過上面介紹的cursorLedger寫入到Bookkeeper中持久化。整個遊標物件會被寫入到一個Entry中,其Protobuf的定義如下:

message PositionInfo { required int64 ledgerId = 1; required int64 entryId = 2; repeated MessageRange individualDeletedMessages = 3; repeated LongProperty properties = 4; repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5; }

看到這裡,其實Batch訊息中單條訊息確認的實現也清晰了,BatchDeletedIndexes是一個ConcurrentSkipListMap,Key為一個Position物件,物件裡面包含了LedgerId和EntryId。Value是一個BitSet,記錄了這個Batch裡面哪些訊息已經被確認。BatchDeletedIndexes會和單條訊息的空洞一起放在同一個物件(PositionInfo)中,最後持久化到Bookkeeper。

空洞資料如果寫入Bookkeeper失敗了,現在Pulsar還會嘗試往ZooKeeper中儲存,和索引資訊一起儲存。但是ZooKeeper不會儲存所有的資料,只會儲存一小部分,儘可能的讓客戶端不出現重複消費。我們可以通過broker.conf中的配置項來決定最多持久化多少資料到ZooKeeper,配置項名為:managedLedgerMaxUnackedRangesToPersistInZooKeeper,預設值是1000。

訊息空洞管理的優化

空洞儲存的方案看起來已經很完美,但是在海量未確認訊息的場景下還是會出現一些問題。首先是大量的訂閱會讓遊標數量暴增,導致Broker記憶體的佔用過大。其次,有很多空洞其實是根本沒有變化的,現在每次都要儲存全量的空洞資料。最後,雖然優化版RangeSet在記憶體中使用了BitSet來儲存,但是實際儲存在Bookkeeper中的資料MessageRange,還是一個個由LedgerId和EntryId組成的物件,每個MessageRange佔用16位元組。當空洞數量比較多時,總體體積會超過5MB,而現在Bookkeeper能寫入的單個Entry大小上限是5MB,如果超過這個閾值就會出現空洞資訊持久化失敗的情況。

對於這種情況,已經有專門的PIP在解決這個問題,筆者在寫這篇文章的時候,這個PIP程式碼已經提交,正在Review階段,因此下面的內容可能會和最終程式碼有一定差距。

新的方案中主要使用LRU+分段儲存的方式來解決上述問題。由於遊標中空洞資訊資料量可能會很大,因此記憶體中只儲存少量熱點區間,通過LRU演算法來切換冷熱資料,從而進一步壓縮記憶體的使用率。分段儲存主要是把空洞資訊儲存到不同的Entry中去,這樣能避免超過一個Entry最大訊息5MB的限制。

如果我們把空洞資訊拆分為多個Entry來儲存,首先面臨的問題是索引。因為使用單個Entry記錄時,只需要讀取Ledger中最後一個Entry即可,而拆分為多個Entry後,我們不知道要讀取多少個Entry。因此,新方案中引入了Marker,如下圖所示:

當所有的Entry儲存完成後,插入一個Marker,Marker是一個特殊的Entry,記錄了當前所有拆分儲存的Entry。當資料恢復時,從後往前讀,先讀出索引,然後再根據索引讀取所有的Entry。

由於儲存涉及到多個Entry,因此需要保證原子性,只要最後一個Entry讀出來不是Marker,則說明上次的儲存沒有完成就中斷了,會繼續往前讀,直到找到一個完整的Marker。

空洞資訊的儲存,也不需要每次全量了。以Ledger為單位,記錄每個Ledger下的資料是否有修改過,如果空洞資料被修改過會被標識為髒資料,儲存時只會儲存有髒資料的部分,然後修改Marker中的索引。

假設Entry-2中儲存的空洞資訊有修改,則Entry-2會被標記為髒資料,下次儲存時,只需要儲存一個Entry-2,再儲存一個Marker即可。只有當整個Ledger寫滿的情況下,才會觸發Marker中所有Entry複製到新Ledger的情況。如下圖所示:

ManagedLedger在記憶體中通過LinkedHashMap實現了一個LRU連結串列,會有執行緒定時檢查空洞資訊的記憶體佔用是否已經達到閾值,如果達到了閾值則需要進行LRU換出,切換以Ledger為單位,把最少使用的資料從Map中移除。LRU資料的換入是同步的,當新增或者呼叫Contains時,發現Marker中存在這個Ledger的索引,但是記憶體中沒有對應的資料,則會觸發同步資料的載入。非同步換出和同步換入,主要是為了讓資料儘量在記憶體中多待一會,避免出現頻繁的換入換出。

尾聲

Pulsar中的設計細節非常多,由於篇幅有限,作者會整理一系列的文章進行技術分享,敬請期待。如果各位希望系統性地學習Pulsar,可以購買作者出版的新書《深入解析Apache Pulsar》。