百萬級 Topic,騰訊雲的 Apache Pulsar 穩定性實踐

語言: CN / TW / HK

Apache Pulsar 作為雲原生時代訊息流系統,採用儲存計算分離架構,支援大叢集、多租戶、百萬級 Topic、跨地域資料複製、持久化儲存、分層儲存、高可擴充套件性等企業級和金融級功能。Apache Pulsar 提供了統一的消費模型,支援訊息佇列和流兩種場景,既能為佇列場景提供企業級讀寫服務質量和強一致性保障,又能為流場景提供高吞吐、低延遲。

Apache Pulsar 在騰訊雲中已經得到大規模的生產實踐,在過去一年中承接了諸多行業生態中不同的使用場景。在實際的生產實踐中,騰訊雲針對 Apache Pulsar 做了一系列的效能優化和穩定性功能方面的工作,來保障使用者在不同的場景下系統的穩定高效的執行。本文圍繞騰訊雲近一年在 Pulsar 穩定性和效能方面優化最佳實踐。

Pulsar 在騰訊雲百萬級 Topic 上的應用

為什麼選擇在生產環境中使用 Pulsar?

此前該使用者使用 Kafka 叢集來承載業務,由於業務的特定場景,叢集的整體流量相對不大,但是需要使用的 Topic 較多。此前使用 Kafka 叢集時,由於 Kafka 自身架構的限定,使用者不能在一套叢集中建立較多的 Topic,所以為了滿足業務多 Topic 的使用場景,需要部署多套 Kafka 叢集來滿足業務的使用,導致業務使用的成本較大。

Pulsar本身除了具備 Pub-Sub 的傳統 MQ 功能外,其底層架構計算儲存分離,在儲存層分層分片,可以很容易地把 BookKeeper 中的資料 offload 到廉價儲存上。Pulsar Functions 是 Serverless 的輕量化計算框架,為使用者提供了 Topic 之間中轉的能力。在開源之前,Pulsar 已在 Yahoo! 的生產環境中經歷 5 年的打磨,並且可以輕鬆擴縮容,支撐多 Topic 場景。為了降低使用的成本,同時滿足多 Topic 的業務場景,該使用者切換到了 Pulsar 的叢集上。

當前該使用者的一套Pulsar 叢集可以承載 60W 左右的 Topic,在很好地滿足了業務使用的場景的同時降低了使用成本。

Apache Pulsar 穩定性優化實踐

實踐 1:訊息空洞的影響及規避措施

使用 Shared 訂閱模式或單條 Ack 訊息模型時,使用者經常會遇到 Ack 空洞的情況。Pulsar 中單獨抽象出了 individuallyDeletedMessages 集合來記錄空洞訊息的情況。該集合是開閉區間集合,開區間表明訊息是空洞訊息,閉區間表明訊息已被處理。早期 Pulsar 支援單條 Ack 和批量 Ack 兩種模型,後者對標 Kafka 的 Ack Offset。引入單條 Ack 模型主要針對線上業務場景,但也因此帶來了 Ack 空洞問題。Ack 空洞即下圖中 individuallyDeletedMessage 所展示的集合。

如何理解 individuallyDeletedMessage?以下圖為例:

該記錄中第一個 Ledger id 是 5:1280,該集合是閉區間,說明訊息已經被 Ack;之後的 5:1281 是開區間,說明訊息沒有被 Ack。這裡就用開閉區間的形式來區分訊息是否被 Ack。

Ack 空洞的出現原因可能因為 Broker 處理失敗,源於早期版本的設計缺陷,Ack 處理沒有返回值。在 2.8.0 及以上版本中,對事務訊息支援上引入了 AckResponse 概念,支援返回值。因此在早期版本中,呼叫 Ack 後無法確保 Broker 可以正確處理 Ack 請求。第二個原因可能因為客戶端出於各種原因沒有呼叫 Ack,在生產實踐中出現較多。

為了規避 Ack 空洞,一種方法是精確計算 Backlog Size。因為在 Broker 上解析 Batch 訊息會浪費效能,在 Pulsar 中對 Batch 訊息的解析在消費者側,因此一個 Entry 可能是單條訊息也可能是 Batch 訊息的。後者情況下 Batch 內的訊息數量或形態是未知的。為此要精確計算 Backlog Size,但經過調研發現這種方法的複雜性和難度較大。

另一種方法是 Broker 的主動補償策略。因為 individuallyDeletedMessage 儲存在每一個 ManagedCursor,也就是每一個訂閱物件到 Broker 實際類中的對映。每一個訂閱都可以拿到對應的 individuallyDeletedMessage 集合,Broker 就可以主動把集合推送到客戶端,也就是主動補償。

接下來我們瞭解一下 Broker 主動補償機制,即 Backlog 策略。在瞭解補償機制之前,先要了解 Topic 可能的分佈與構成。

正常來說,生產者向 Topic 釋出訊息,消費者從 Topic 接收訊息。如上圖,紅、灰、藍色代表訊息在 Topic 中的三種形態。Pulsar 中引入了 Backlog 策略,用來描述生產者和消費者之間的 Gap。該策略提供了三種選項,包括 Producer Exception、Producer Request Hold 和 Consumer Backlog Eviction。

其中,Producer Exception 相對使用者友好,在生產環境中更加常用。當訊息堆積到一定程度,消費者處理訊息的能力不足時,Producer Exception 會通知生產者出現了問題。Producer Request Hold 原理相同,但是 Producer Request Hold 只是會讓生產者停止傳送,而不會告知其原因(即不會向業務側返回標識),使用者感知為 Producer 停止傳送訊息但是無異常丟擲。而 Consumer Backlog Eviction 則會自動丟棄最早的訊息來保證訊息持續處理,可能導致丟訊息的情況出現。

此外,還需要注意的是 Pulsar 計算 Backlog Size 的方式。上圖可以理解為一個事件流,生產者源源不斷地 append message。Pulsar 計算 Backlog Size 時,是計算從當前 MarkedDeletedPosition 的位置,到 ReadPosition 的位置之前的 Backlog Size,而後結合 Producer Exception 策略暴露出來。如果 Ack 空洞,比如 Broker 側請求失敗,或者客戶程式碼產生異常導致 Ack 永遠不會被呼叫,Backlog Size 會到達一定速率,就相當於限制生產者。上圖中,M4 和 M2 是兩條空洞訊息,出現這樣的空洞訊息時,生產者的傳送流就遲早會被打斷。

Broker 主動補償機制的實現方式如上圖。由於 individuallyDeletedMessage 記錄了所有訊息的 Ack 成功與否的狀態,就可以從中獲取 MarkedDeletedPosition 位置的訊息,開啟一個 Executor Service 定時任務,設定監聽頻率,間隔一段時間將訊息重新推送到客戶端側,實現 Broker 的主動補償,避免 Ack 空洞導致 Producer Exception 被頻繁觸發。

實踐 2:再談 TTL、Backlog 及 Retention 策略

我們先看下這三個概念:

  • TTL:表示訊息在指定時間內沒有被使用者 Ack 時會在 Broker 主動 Ack。

  • Backlog :表示生產者傳送的訊息與消費者接收訊息之間的差距。

  • Retention:表示當訊息被 Ack 之後,繼續在 Bookie 側保留多久的時間,以 Ledger 為最小操作單元。

如果 TTL 和 Retention 同時設定,那麼一條訊息的生命週期該如何計算?來看以下程式碼:

void updateCursor (ManagedCursor Impl cursor, PositionImpl newPosition) t    Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated (cursor, newPosition);    if (pair == nulL) {        Cursor has been removed in the meantime        trimConsumedLedgersInBackground();        return;    }        PositionImplpreviousSlowestReader = pair.getLeftO);    PositionImpl currentSlowestReader = pair.getRightO);
if (previousSlowestReader.compareTo(currentSlowestReader)==0){ // The slowest consumer has not changed position. Nothing to do right now return; } //Only trigger a trimming when switching to the next Ledger if (previousSlowestReader.getLedgerId() != newPosition.getLedgerId0)) f trimConsumedLedgersInBackground(); }

複製程式碼

  • TTL:根據設定的時間(預設五分鐘)定期檢查,根據觸發的策略不斷更新 cursor 位置,處理訊息過期。

  • Retention:檢查 Ledger 的建立時間(通過元資料時間戳可以瞭解 Ledger 的生命週期)以及 Entry 的大小兩個閾值來決定是否刪除某一個 Ledger。

在以上程式碼中的最後三行中,將之前最慢的 LedgerId 與 newPosition 的 LedgerId 對比,檢查 ManagedLedger 是否發生過切換,一旦切換就呼叫 trimConsumedLedgersInBackground()。該函式方法的核心程式碼策略就是 Retention 的邏輯。

由此可知:

  • 當 TTL 時間小於 Retention 時間時,訊息的完整生命週期就是 TTL 時間 + Retention 時間;

  • 當 TTL 時間大於等於 Retention 時間,訊息的生命週期就是 TTL 時間。

這裡又引出了一個新問題:TTL 策略為什麼要選擇在 Ledger 切換的時機來觸發 Ledger 的刪除操作呢?因為 Retention 刪除 Ledger 時是以 Ledger 為最小操作單元。如果 Ledger 不切換,Retention 也不會觸發刪除。所以上述程式碼邏輯會選擇切換時機來交給 Retention 執行刪除動作。

實踐 3:延遲訊息與 TTL 的關係

在團隊曾經遇到的場景中,某使用者傳送了數十萬延遲訊息,延遲設定為十天,但 TTL 過期時間設定為五天,五天後所有延遲訊息都已被過期。我們可以從原始碼層面看一下 TTL 策略。

public boolean expireMessages(int messageTTLInSeconds) {    if (expirationCheckInProgressUpdater.compareAndSet( obj: this, FALSE, TRUE)) {        log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,                messageTTLInSeconds);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { try { long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); return Messaqelmpl.isEntryExpired(messageTTLInSeconds. entryTimestamp); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e); } finally { entry<release(); } return false; }, callback: this, ctx: null); return true;

複製程式碼

public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {    return messageTTLInSeconds != 0            && (System.currentTimeMillis() >            entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));}

複製程式碼

TTL 的核心邏輯是通過 cursor 傳入的值決定訊息是否過期,即是否能找到 Entry。TTL 只獲取了訊息的釋出時間,卻沒有理會訊息的延遲設定。結合上面兩段程式碼,isEntryExpired 只關心 PublishedTime 時間戳元資料屬性,FindNewestMatchingEntry 物件時可以從元資料中獲取 PublishedTime。所以當延遲設定小於 TTL 時間就會導致延遲訊息被過期,在使用者側就會發現訊息丟失。

針對這一問題,騰訊團隊向社群提供了 PR( http://github.com/apache/pulsar/pull/15628),主要邏輯是分別檢查訊息的釋出時間和延遲時間,到達釋出時間後如果延遲時間大於 TTL 時間,則 TTL 時間到達後依然不能過期訊息。IsEntryExpired 會判斷並檢查 TTL 時間與延遲時間。這裡釋出時間和延遲時間要一次性從 Entry 中獲取,否則每次獲取的 Entry 物件是不一樣的。此外,延遲時間需要傳送時間點的時間戳,根據具體計算出延遲的時間長度來做判斷。

實踐 4:Admin API Block 的優化處理

在 Pulsar 之前的程式碼邏輯中:

  • 如果在非同步程式碼中頻繁呼叫同步邏輯,那麼其中的牽連關係很可能導致 Pulsar 外部的執行緒卡住,這時只能重啟對應的 Broker 節點來恢復任務。

  • Pulsar 的 Http Lookup 服務呼叫的是外部埠,一旦非同步呼叫同步導致阻塞,那麼該服務外部埠的資料流也會出現阻塞。

  • Pulsar Web 服務的效能較差,主要是因為 CompletableFuture 的誤用。當我們定義一個 CompletableFuture 物件後,經常呼叫 thenapply 或者 thencompose 來返回物件。這其實是 CompletableFuture 內物件的同步返回,是由當前執行緒棧執行的。如果非同步任務沒有返回,則由回撥執行緒執行任務。

  • Pulsar 高版本加入了 Metadata Store 執行緒池的抽象。這個抽象會增大 ZooKeeper 的壓力。當同一時間內的外部服務呼叫量增大,ZooKeeper 負載增大會導致訊息延遲等指標出現退化。

騰訊團隊針對上述問題,一方面剝離了 Metadata Store 執行緒池,另一方面通過服務監聽來定位和發現 Web 服務的效能較弱的位置,去做進一步的優化處理。此外,團隊還加入了超時處理邏輯,所有 Pulsar 外部執行緒如果在最後限定時間(30 秒)內無法處理完成就會丟擲超時。雖然單個外部執行緒超時、重啟影響不大,但這樣避免了整個資料流阻塞的情況。

實踐 5:zk-node 洩露

有時使用者正在使用的 Topic 不多,但 zk-node 數量卻很大,Pulsar 對 zk-node 的放大倍數較高。上圖拐點是 zk-node 髒資料清理的時點,可以看到 zk-node 資料洩漏的情況非常嚴重,達到 5 倍之多。

在建立一個 Topic 時,首先要在 zk-path 的六級目錄下涵蓋所有 Topic 資訊,在 ZooKeeper 上建立的資源量很大。此目錄下涵蓋了所有的 Topic,問題即出現在六個層級中。為此團隊做了以下操作來處理 zk-node 髒資料:

  • 首先通過 ZooKeeper client 讀取 zk-path,按照指定的格式拼接所有 Topic 名字,獲取 Topic 列表;

  • 通過 pulsar-admin 檢查叢集中是否存在該 Topic;如果叢集中不存在該 Topic,則相關資料一定是髒資料;(修復 zk-node 洩露問題的相關程式碼已 merge 進 2.8 + 的社群版本。)

  • 切記在清理 ZookKeeper 髒資料之前備份 ZookKeeper 資料。

實踐 6:Bookie Ledger 洩漏

團隊在實踐中發現,雖然 Retention 策略設定的訊息生命週期最長應不超過 30 天,但檢測掃描到的一些訊息已經有數百天曆史,且難以從 BookKeeper 中刪除。針對這一問題,團隊分析如下:

  • 觸發 Ledger 刪除的唯一路徑是 Retention 策略。這些訊息產生的原因只能定位到一些 Bookie CLI 命令,這些命令生成了一些 Retention 策略管控不到的 Ledger。

  • 每一個 Ledger 都有對應的 LedgerInfo,記錄了它的元資料資訊,包括建立時間等。獲取元資料後,就可以確定 Ledger 是多久前建立的,還可以確定 Ledger 具體是在哪些 Bookie 節點上。

  • 一個 Ledger 唯一歸屬於一個 Topic,所以可以獲取 Topic 中存在 Ledger 的資訊,進而確定某個 Ledger 是否存在於 Topic 的 Ledger 列表中,如果不在就是髒資料,可以清理。

  • 如果 Ledger 對應的元資料已經丟失,那麼 Ledger 本身也可以直接刪除。

  • 注意 Schema,如果忽略 Schema 可能會刪除使用者 Schema。恢復使用者 Schema 時,Schema 的 Ledger 資訊是存在 Bookie 中,Schema 自身的資訊存在 Broker 歸屬的 ZK 中。恢復時需要先把 Broker 中存在的 Schema 資訊刪除,再讓使用者嘗試使用生產端重建 Schema。

注意:執行以上操作前,切記提前備份資料。

實踐 7:Apache Pulsar 多級快取優化

如上圖,Pulsar 現有快取策略會導致明顯的毛刺現象,出現服務週期性的劇烈效能波動和使用者端的明顯感知。

try {     //We need to check all the segments, starting from the current     //backward to minimize the     //checks for recently inserted entries     int size = cacheSegments.size();     for (int i = 0; i < size; i++)         int segmentIdx = (currentSegmentIdx + (size - i)) % size;  

複製程式碼

try {    int offset = currentSegmentOffset.getAndAdd(entrySize);    if (offset + entrySize > segmentSize) {        // Rollover to next segment        currentSegmentIdx = (currentSegmentIdx + 1) % cacheSegments.size();          currentSegment0ffset. set(alignedSize);        cacheIndexes.get(currentSegmentIdx).clear();         offset = 0;}

複製程式碼

這裡騰訊團隊主要做了讀取快取的優化。在讀取快取層面,可以看到 Pulsar 在讀取快取時迭代了快取中的所有訊息,如第一段程式碼倒數第二行所示。同時,一旦 offset + entrySize 大於 segmentSize,就會清除全部快取,如第二段程式碼所示。這也就是為什麼之前會出現明顯的效能波動點的原因所在。

為此團隊使用了 OHC + LRU 的策略,避免了快取情況導致的劇烈波動,效果如下圖:

總結與展望

本文分享了騰訊雲團隊在 Apache Pulsar 穩定性上的實踐經驗,重點介紹了訊息空洞的影響及規避措施等最佳實踐,為更多開發者提供參考。同時,騰訊雲團隊也在參與社群貢獻中,和社群討論以下重要問題並探索相關解決方案,如客戶端超時時間內的重試策略,借鑑其他 MQ 的思路進行改進,嘗試在客戶端加入超時重試策略,通過多次重試機制來避免傳送失敗的情況發生;優化 Broker 和 Bookie OOM,針對 Ack 空洞對應集合無法縮容的問題進行改進;以及優化 Bookie Auto Recover,加入超時重試邏輯,避免 BookKeeper 和 ZooKeeper 之間發生 Session 超時的情況下服務重啟。

作者介紹:

冉小龍,騰訊雲高階研發工程師,Apache Pulsar Committer,RoP maintainer,Apache Pulsar Go Client、Pulsarctl 與 Go Functions 作者與主要維護者。

點選閱讀原文,關注 Apache Pulsar。

今日好文推薦

從一線研發到公司創始人,基礎軟體創業者迷霧中與市場賽跑

Azure CTO 呼籲不要使用 C/C++ 啟動新專案,C++ 之父迴應:你們這些高管就愛喜新厭舊

NGINX 侷限太多,Cloudflare 最終放棄它並用 Rust 自研了全新替代品