百萬級 Topic,騰訊雲的 Apache Pulsar 穩定性實踐

語言: CN / TW / HK

Apache Pulsar 作為雲原生時代消息流系統,採用存儲計算分離架構,支持大集羣、多租户、百萬級 Topic、跨地域數據複製、持久化存儲、分層存儲、高可擴展性等企業級和金融級功能。Apache Pulsar 提供了統一的消費模型,支持消息隊列和流兩種場景,既能為隊列場景提供企業級讀寫服務質量和強一致性保障,又能為流場景提供高吞吐、低延遲。

Apache Pulsar 在騰訊雲中已經得到大規模的生產實踐,在過去一年中承接了諸多行業生態中不同的使用場景。在實際的生產實踐中,騰訊雲針對 Apache Pulsar 做了一系列的性能優化和穩定性功能方面的工作,來保障用户在不同的場景下系統的穩定高效的運行。本文圍繞騰訊雲近一年在 Pulsar 穩定性和性能方面優化最佳實踐。

Pulsar 在騰訊雲百萬級 Topic 上的應用

為什麼選擇在生產環境中使用 Pulsar?

此前該用户使用 Kafka 集羣來承載業務,由於業務的特定場景,集羣的整體流量相對不大,但是需要使用的 Topic 較多。此前使用 Kafka 集羣時,由於 Kafka 自身架構的限定,用户不能在一套集羣中創建較多的 Topic,所以為了滿足業務多 Topic 的使用場景,需要部署多套 Kafka 集羣來滿足業務的使用,導致業務使用的成本較大。

Pulsar本身除了具備 Pub-Sub 的傳統 MQ 功能外,其底層架構計算存儲分離,在存儲層分層分片,可以很容易地把 BookKeeper 中的數據 offload 到廉價存儲上。Pulsar Functions 是 Serverless 的輕量化計算框架,為用户提供了 Topic 之間中轉的能力。在開源之前,Pulsar 已在 Yahoo! 的生產環境中經歷 5 年的打磨,並且可以輕鬆擴縮容,支撐多 Topic 場景。為了降低使用的成本,同時滿足多 Topic 的業務場景,該用户切換到了 Pulsar 的集羣上。

當前該用户的一套Pulsar 集羣可以承載 60W 左右的 Topic,在很好地滿足了業務使用的場景的同時降低了使用成本。

Apache Pulsar 穩定性優化實踐

實踐 1:消息空洞的影響及規避措施

使用 Shared 訂閲模式或單條 Ack 消息模型時,用户經常會遇到 Ack 空洞的情況。Pulsar 中單獨抽象出了 individuallyDeletedMessages 集合來記錄空洞消息的情況。該集合是開閉區間集合,開區間表明消息是空洞消息,閉區間表明消息已被處理。早期 Pulsar 支持單條 Ack 和批量 Ack 兩種模型,後者對標 Kafka 的 Ack Offset。引入單條 Ack 模型主要針對在線業務場景,但也因此帶來了 Ack 空洞問題。Ack 空洞即下圖中 individuallyDeletedMessage 所展示的集合。

如何理解 individuallyDeletedMessage?以下圖為例:

該記錄中第一個 Ledger id 是 5:1280,該集合是閉區間,説明消息已經被 Ack;之後的 5:1281 是開區間,説明消息沒有被 Ack。這裏就用開閉區間的形式來區分消息是否被 Ack。

Ack 空洞的出現原因可能因為 Broker 處理失敗,源於早期版本的設計缺陷,Ack 處理沒有返回值。在 2.8.0 及以上版本中,對事務消息支持上引入了 AckResponse 概念,支持返回值。因此在早期版本中,調用 Ack 後無法確保 Broker 可以正確處理 Ack 請求。第二個原因可能因為客户端出於各種原因沒有調用 Ack,在生產實踐中出現較多。

為了規避 Ack 空洞,一種方法是精確計算 Backlog Size。因為在 Broker 上解析 Batch 消息會浪費性能,在 Pulsar 中對 Batch 消息的解析在消費者側,因此一個 Entry 可能是單條消息也可能是 Batch 消息的。後者情況下 Batch 內的消息數量或形態是未知的。為此要精確計算 Backlog Size,但經過調研發現這種方法的複雜性和難度較大。

另一種方法是 Broker 的主動補償策略。因為 individuallyDeletedMessage 存儲在每一個 ManagedCursor,也就是每一個訂閲對象到 Broker 實際類中的映射。每一個訂閲都可以拿到對應的 individuallyDeletedMessage 集合,Broker 就可以主動把集合推送到客户端,也就是主動補償。

接下來我們瞭解一下 Broker 主動補償機制,即 Backlog 策略。在瞭解補償機制之前,先要了解 Topic 可能的分佈與構成。

正常來説,生產者向 Topic 發佈消息,消費者從 Topic 接收消息。如上圖,紅、灰、藍色代表消息在 Topic 中的三種形態。Pulsar 中引入了 Backlog 策略,用來描述生產者和消費者之間的 Gap。該策略提供了三種選項,包括 Producer Exception、Producer Request Hold 和 Consumer Backlog Eviction。

其中,Producer Exception 相對用户友好,在生產環境中更加常用。當消息堆積到一定程度,消費者處理消息的能力不足時,Producer Exception 會通知生產者出現了問題。Producer Request Hold 原理相同,但是 Producer Request Hold 只是會讓生產者停止發送,而不會告知其原因(即不會向業務側返回標識),用户感知為 Producer 停止發送消息但是無異常拋出。而 Consumer Backlog Eviction 則會自動丟棄最早的消息來保證消息持續處理,可能導致丟消息的情況出現。

此外,還需要注意的是 Pulsar 計算 Backlog Size 的方式。上圖可以理解為一個事件流,生產者源源不斷地 append message。Pulsar 計算 Backlog Size 時,是計算從當前 MarkedDeletedPosition 的位置,到 ReadPosition 的位置之前的 Backlog Size,而後結合 Producer Exception 策略暴露出來。如果 Ack 空洞,比如 Broker 側請求失敗,或者客户代碼產生異常導致 Ack 永遠不會被調用,Backlog Size 會到達一定速率,就相當於限制生產者。上圖中,M4 和 M2 是兩條空洞消息,出現這樣的空洞消息時,生產者的發送流就遲早會被打斷。

Broker 主動補償機制的實現方式如上圖。由於 individuallyDeletedMessage 記錄了所有消息的 Ack 成功與否的狀態,就可以從中獲取 MarkedDeletedPosition 位置的消息,開啟一個 Executor Service 定時任務,設置監聽頻率,間隔一段時間將消息重新推送到客户端側,實現 Broker 的主動補償,避免 Ack 空洞導致 Producer Exception 被頻繁觸發。

實踐 2:再談 TTL、Backlog 及 Retention 策略

我們先看下這三個概念:

  • TTL:表示消息在指定時間內沒有被用户 Ack 時會在 Broker 主動 Ack。

  • Backlog :表示生產者發送的消息與消費者接收消息之間的差距。

  • Retention:表示當消息被 Ack 之後,繼續在 Bookie 側保留多久的時間,以 Ledger 為最小操作單元。

如果 TTL 和 Retention 同時設置,那麼一條消息的生命週期該如何計算?來看以下代碼:

void updateCursor (ManagedCursor Impl cursor, PositionImpl newPosition) t    Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated (cursor, newPosition);    if (pair == nulL) {        Cursor has been removed in the meantime        trimConsumedLedgersInBackground();        return;    }        PositionImplpreviousSlowestReader = pair.getLeftO);    PositionImpl currentSlowestReader = pair.getRightO);
if (previousSlowestReader.compareTo(currentSlowestReader)==0){ // The slowest consumer has not changed position. Nothing to do right now return; } //Only trigger a trimming when switching to the next Ledger if (previousSlowestReader.getLedgerId() != newPosition.getLedgerId0)) f trimConsumedLedgersInBackground(); }

複製代碼

  • TTL:根據設置的時間(默認五分鐘)定期檢查,根據觸發的策略不斷更新 cursor 位置,處理消息過期。

  • Retention:檢查 Ledger 的創建時間(通過元數據時間戳可以瞭解 Ledger 的生命週期)以及 Entry 的大小兩個閾值來決定是否刪除某一個 Ledger。

在以上代碼中的最後三行中,將之前最慢的 LedgerId 與 newPosition 的 LedgerId 對比,檢查 ManagedLedger 是否發生過切換,一旦切換就調用 trimConsumedLedgersInBackground()。該函數方法的核心代碼策略就是 Retention 的邏輯。

由此可知:

  • 當 TTL 時間小於 Retention 時間時,消息的完整生命週期就是 TTL 時間 + Retention 時間;

  • 當 TTL 時間大於等於 Retention 時間,消息的生命週期就是 TTL 時間。

這裏又引出了一個新問題:TTL 策略為什麼要選擇在 Ledger 切換的時機來觸發 Ledger 的刪除操作呢?因為 Retention 刪除 Ledger 時是以 Ledger 為最小操作單元。如果 Ledger 不切換,Retention 也不會觸發刪除。所以上述代碼邏輯會選擇切換時機來交給 Retention 執行刪除動作。

實踐 3:延遲消息與 TTL 的關係

在團隊曾經遇到的場景中,某用户發送了數十萬延遲消息,延遲設置為十天,但 TTL 過期時間設置為五天,五天後所有延遲消息都已被過期。我們可以從源碼層面看一下 TTL 策略。

public boolean expireMessages(int messageTTLInSeconds) {    if (expirationCheckInProgressUpdater.compareAndSet( obj: this, FALSE, TRUE)) {        log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,                messageTTLInSeconds);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { try { long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); return Messaqelmpl.isEntryExpired(messageTTLInSeconds. entryTimestamp); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e); } finally { entry<release(); } return false; }, callback: this, ctx: null); return true;

複製代碼

public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {    return messageTTLInSeconds != 0            && (System.currentTimeMillis() >            entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));}

複製代碼

TTL 的核心邏輯是通過 cursor 傳入的值決定消息是否過期,即是否能找到 Entry。TTL 只獲取了消息的發佈時間,卻沒有理會消息的延遲設置。結合上面兩段代碼,isEntryExpired 只關心 PublishedTime 時間戳元數據屬性,FindNewestMatchingEntry 對象時可以從元數據中獲取 PublishedTime。所以當延遲設置小於 TTL 時間就會導致延遲消息被過期,在用户側就會發現消息丟失。

針對這一問題,騰訊團隊向社區提供了 PR( http://github.com/apache/pulsar/pull/15628),主要邏輯是分別檢查消息的發佈時間和延遲時間,到達發佈時間後如果延遲時間大於 TTL 時間,則 TTL 時間到達後依然不能過期消息。IsEntryExpired 會判斷並檢查 TTL 時間與延遲時間。這裏發佈時間和延遲時間要一次性從 Entry 中獲取,否則每次獲取的 Entry 對象是不一樣的。此外,延遲時間需要發送時間點的時間戳,根據具體計算出延遲的時間長度來做判斷。

實踐 4:Admin API Block 的優化處理

在 Pulsar 之前的代碼邏輯中:

  • 如果在異步代碼中頻繁調用同步邏輯,那麼其中的牽連關係很可能導致 Pulsar 外部的線程卡住,這時只能重啟對應的 Broker 節點來恢復任務。

  • Pulsar 的 Http Lookup 服務調用的是外部端口,一旦異步調用同步導致阻塞,那麼該服務外部端口的數據流也會出現阻塞。

  • Pulsar Web 服務的性能較差,主要是因為 CompletableFuture 的誤用。當我們定義一個 CompletableFuture 對象後,經常調用 thenapply 或者 thencompose 來返回對象。這其實是 CompletableFuture 內對象的同步返回,是由當前線程棧執行的。如果異步任務沒有返回,則由回調線程執行任務。

  • Pulsar 高版本加入了 Metadata Store 線程池的抽象。這個抽象會增大 ZooKeeper 的壓力。當同一時間內的外部服務調用量增大,ZooKeeper 負載增大會導致消息延遲等指標出現退化。

騰訊團隊針對上述問題,一方面剝離了 Metadata Store 線程池,另一方面通過服務監聽來定位和發現 Web 服務的性能較弱的位置,去做進一步的優化處理。此外,團隊還加入了超時處理邏輯,所有 Pulsar 外部線程如果在最後限定時間(30 秒)內無法處理完成就會拋出超時。雖然單個外部線程超時、重啟影響不大,但這樣避免了整個數據流阻塞的情況。

實踐 5:zk-node 泄露

有時用户正在使用的 Topic 不多,但 zk-node 數量卻很大,Pulsar 對 zk-node 的放大倍數較高。上圖拐點是 zk-node 髒數據清理的時點,可以看到 zk-node 數據泄漏的情況非常嚴重,達到 5 倍之多。

在創建一個 Topic 時,首先要在 zk-path 的六級目錄下涵蓋所有 Topic 信息,在 ZooKeeper 上創建的資源量很大。此目錄下涵蓋了所有的 Topic,問題即出現在六個層級中。為此團隊做了以下操作來處理 zk-node 髒數據:

  • 首先通過 ZooKeeper client 讀取 zk-path,按照指定的格式拼接所有 Topic 名字,獲取 Topic 列表;

  • 通過 pulsar-admin 檢查集羣中是否存在該 Topic;如果集羣中不存在該 Topic,則相關數據一定是髒數據;(修復 zk-node 泄露問題的相關代碼已 merge 進 2.8 + 的社區版本。)

  • 切記在清理 ZookKeeper 髒數據之前備份 ZookKeeper 數據。

實踐 6:Bookie Ledger 泄漏

團隊在實踐中發現,雖然 Retention 策略設置的消息生命週期最長應不超過 30 天,但檢測掃描到的一些消息已經有數百天曆史,且難以從 BookKeeper 中刪除。針對這一問題,團隊分析如下:

  • 觸發 Ledger 刪除的唯一路徑是 Retention 策略。這些消息產生的原因只能定位到一些 Bookie CLI 命令,這些命令生成了一些 Retention 策略管控不到的 Ledger。

  • 每一個 Ledger 都有對應的 LedgerInfo,記錄了它的元數據信息,包括創建時間等。獲取元數據後,就可以確定 Ledger 是多久前創建的,還可以確定 Ledger 具體是在哪些 Bookie 節點上。

  • 一個 Ledger 唯一歸屬於一個 Topic,所以可以獲取 Topic 中存在 Ledger 的信息,進而確定某個 Ledger 是否存在於 Topic 的 Ledger 列表中,如果不在就是髒數據,可以清理。

  • 如果 Ledger 對應的元數據已經丟失,那麼 Ledger 本身也可以直接刪除。

  • 注意 Schema,如果忽略 Schema 可能會刪除用户 Schema。恢復用户 Schema 時,Schema 的 Ledger 信息是存在 Bookie 中,Schema 自身的信息存在 Broker 歸屬的 ZK 中。恢復時需要先把 Broker 中存在的 Schema 信息刪除,再讓用户嘗試使用生產端重建 Schema。

注意:執行以上操作前,切記提前備份數據。

實踐 7:Apache Pulsar 多級緩存優化

如上圖,Pulsar 現有緩存策略會導致明顯的毛刺現象,出現服務週期性的劇烈性能波動和用户端的明顯感知。

try {     //We need to check all the segments, starting from the current     //backward to minimize the     //checks for recently inserted entries     int size = cacheSegments.size();     for (int i = 0; i < size; i++)         int segmentIdx = (currentSegmentIdx + (size - i)) % size;  

複製代碼

try {    int offset = currentSegmentOffset.getAndAdd(entrySize);    if (offset + entrySize > segmentSize) {        // Rollover to next segment        currentSegmentIdx = (currentSegmentIdx + 1) % cacheSegments.size();          currentSegment0ffset. set(alignedSize);        cacheIndexes.get(currentSegmentIdx).clear();         offset = 0;}

複製代碼

這裏騰訊團隊主要做了讀取緩存的優化。在讀取緩存層面,可以看到 Pulsar 在讀取緩存時迭代了緩存中的所有消息,如第一段代碼倒數第二行所示。同時,一旦 offset + entrySize 大於 segmentSize,就會清除全部緩存,如第二段代碼所示。這也就是為什麼之前會出現明顯的性能波動點的原因所在。

為此團隊使用了 OHC + LRU 的策略,避免了緩存情況導致的劇烈波動,效果如下圖:

總結與展望

本文分享了騰訊雲團隊在 Apache Pulsar 穩定性上的實踐經驗,重點介紹了消息空洞的影響及規避措施等最佳實踐,為更多開發者提供參考。同時,騰訊雲團隊也在參與社區貢獻中,和社區討論以下重要問題並探索相關解決方案,如客户端超時時間內的重試策略,借鑑其他 MQ 的思路進行改進,嘗試在客户端加入超時重試策略,通過多次重試機制來避免發送失敗的情況發生;優化 Broker 和 Bookie OOM,針對 Ack 空洞對應集合無法縮容的問題進行改進;以及優化 Bookie Auto Recover,加入超時重試邏輯,避免 BookKeeper 和 ZooKeeper 之間發生 Session 超時的情況下服務重啟。

作者介紹:

冉小龍,騰訊雲高級研發工程師,Apache Pulsar Committer,RoP maintainer,Apache Pulsar Go Client、Pulsarctl 與 Go Functions 作者與主要維護者。

點擊閲讀原文,關注 Apache Pulsar。

今日好文推薦

從一線研發到公司創始人,基礎軟件創業者迷霧中與市場賽跑

Azure CTO 呼籲不要使用 C/C++ 啟動新項目,C++ 之父迴應:你們這些高管就愛喜新厭舊

NGINX 侷限太多,Cloudflare 最終放棄它並用 Rust 自研了全新替代品