構建下一代萬億級雲原生訊息架構:Apache Pulsar 在 vivo 的探索與實踐

語言: CN / TW / HK

vivo網際網路大資料團隊 - Chen Jianbo、Quan Limin


本文整理自 vivo 網際網路大資料團隊在 Apache Pulsar Meetup 上的演講《Apache Pulsar 在 vivo 的探索與實踐》,介紹 vivo 在叢集管理與監控上應用 Pulsar 的實踐。


vivo 移動網際網路為全球 4 億 + 智慧手機使用者提供網際網路產品與服務。其中,vivo 分散式訊息中介軟體團隊主要為 vivo 所有內外銷實時計算業務提供高吞吐、低延時的資料接入、訊息佇列等服務,覆蓋應用商店、短影片、廣告等業務。業務叢集已達每天十萬億級的資料規模。

圖 1. vivo 分散式訊息中介軟體系統架構

上圖為系統的整體架構,其中資料接入層包括資料接入、資料採集服務,支援 SDK 直連;訊息中介軟體由 Kafka 和 Pulsar 共同承擔,其中 Pulsar 的承載量達到千億級別;資料處理部分使用 Flink、Spark 等元件。


目前,Kafka 採用多叢集方式,根據不同的業務量級、重要性分別使用不同的叢集提供服務,比如計費叢集、搜尋叢集、日誌叢集。在 Kafka 叢集的內部,則採用物理隔離的方式,根據不同業務的重要性,將不同業務的 Topic 控制在不同的資源組內,避免業務之間相互影響。

圖 2. Kafka 叢集資源隔離

圖 3. Kafka 叢集流量均衡

資源組內部則會針對 Topic 流量、分割槽分佈、磁碟容量、機器機架等指標生成遷移計劃進行流量均衡,以此增強 Kafka 可靠性。目前 Kafka 已在多叢集部署、資源隔離、流量均衡三個方面保障了基本的穩定性和資源利用率,但是在此之外,系統仍存在一些問題。


一、應對業務流量數十倍增長,引入 Apache Pulsar


過去幾年來,Kafka 叢集承載的業務量迅速增長,流量上漲數十倍,帶來諸多問題:


  • Topic 及 Topic 分割槽總量不斷增加,叢集效能受到影響:Kafka 高效能依賴於磁碟的順序讀寫,磁碟上大量分割槽導致隨機讀寫加重;


  • 業務流量增加迅速,存量叢集變大,需要將老的業務進行資源組隔離遷移或者叢集拆分。無論是資源組隔離還是叢集的隔離的方式,由於叢集不可以進行動態擴縮容,機器不能夠進行靈活調配,都存在利用率不高、運維成本增加的問題;


  • 機器擴容慢,需要做長時間流量均衡,難以應對突發流量。叢集規模越大,問題越突出;

  • 消費端效能擴充套件太依賴分割槽擴容,導致叢集元資料瘋狂增長;

  • 叢集數量對應的機器基數大,硬體故障概率高,出現硬體故障時影響會直接傳導到客戶端,缺少中間層容錯。


面對龐大的叢集、流量和多樣化的業務場景,綜合考慮叢集的穩定性和維護成本等因素,vivo 需要一個功能更豐富、適用更多場景、擴充套件能力更強的訊息元件。


Pulsar 如何解決 vivo 存在的問題,可以首先看一下 Pulsar 的架構設計。Pulsar 採用計算儲存層分離架構。計算層的 Broker 節點是對等且無狀態的,可以快速擴充套件;儲存層使用 BookKeeper 作為節點,同樣節點對等。這種分離架構支援計算和儲存層各自獨立擴充套件。

圖 4. Pulsar 儲存計算分離

其次,Pulsar 的各個節點都是輕量化的,在出現故障和宕機時可以快速恢復。一般情況下可以通過快速上下線來解決某個節點機器的問題。同時 Broker 層可以作為 BookKeeper 層的容錯層,可以防止故障直接傳導至使用者端。


Pulsar 擴容時無需長時間的資料遷移,且支援實時均衡。Broker 層抽象了 Bundle 概念,可以用有限的 Bundle 對映海量 Topic,Topic 可以隨著 Bundle 遷移,通過動態遷移 Bundle 可以更好地應對流量突發場景。BookKeeper 分層分片的架構讓資料分佈均勻,在 Broker 層有一個選擇機制,在擴容時可以將資料寫入儲存量小的節點,擴容時無需資料遷移,提供更好的流量高峰應對能力。Bookie 進行資料刷盤時會對批量資料自動進行資料排序,可以避免 Kafka 中的隨機讀寫。


Pulsar 提供了四種訊息模型:Exclusive、Failover、Shared 和 Key_Shared,其中 Shared 模型允許一個分割槽同時被多個消費例項訂閱消費,並採用 Round Robin(輪詢)方式將資料推送到各個消費例項。因此消費能力的擴充套件不會過於依賴分割槽擴容,慢消費的消費例項也可以在 Shared 模型中得到解決。Key_Shared 模型則是在 Shared 的基礎上對應對順序性有要求的場景,可以按照 Key 來消費。

圖 5. Pulsar 訂閱模型

Pulsar 的設計架構帶來了海量分割槽支撐、消費擴充套件、精準限流、流量均衡、快速擴縮容、故障恢復、分層儲存、雲原生容器部署、異地多活等特性和優勢,可以幫助叢集更好地實現高可用、高擴充套件,提高了更高的彈性。



二、Apache Pulsar 叢集管理實踐


下面我們從流量控制和資料管理方面,分享 vivo 在使用 Pulsar 過程中的叢集管理經驗。


2.1 Bundle 的管理


在叢集流量控制層面,比較關鍵的一點就是 Bundle 的管理。Bundle 負責控制使用者流量到 Broker 的具體分佈。Broker 與 Topic 之間沒有直接聯絡,而是在 Broker 之上抽象出 Bundle 概念,通過 Bundle 與 Topic 建立關係;Topic 通過名稱計算雜湊值,並雜湊分佈到一致性雜湊環中,而雜湊環的每一段都是一個 Bundle。另外 Load Manager 根據 Bundle 的負載情況將後者分配到對應的 Broker 上,將 Bundle 資料儲存在 ZooKeeper 中。由此以來就間接實現了 Topic 與 Broker 之間的聯絡(可參考近期 StreamNative 釋出的 Broker 負載均衡技術文章)。

圖 6. Bundle 與 Topic 建立關係

這裡需要注意:

  • Bundle 的個數影響均衡效果,因為通過一致性雜湊來確認 Topic 應該落在哪個 Bundle 上, Topic 與 Bundle 會存在不均衡分配,某些 Bundle 分配的 Topic 可能較多或較少。Bundle 越多,每個 Bundle 承載的 Topic 越少,粒度越細。依賴於 Pulsar 的負載均衡演算法,均衡效果更好;否則若 Bundle 太大,無論如何解除安裝都很難平衡負載;


  • Bundle 資料和 Broker 對映元資料都維護在 ZooKeeper 中,需要做好 Bundle 數量的規劃。


針對以上兩點,我們根據 Broker 來設定 Bundle 數量設定最小最大值來控制,還可以對流量較大的 Topic 針對性擴大分割槽,讓分割槽均勻分配到 Broker Bundle 上。


Pulsar 雖然提供了海量分割槽能力,但是過多的 Topic 或者分割槽產生的 lookup 也會對叢集產生較大的壓力。叢集管理者需要提前規劃 Bundle 和分割槽設定,杜絕濫用。


另外對 Bundle 的操作需要注意:


  • Pulsar 本身提供了解除安裝操作,可以解除 Bundle 和 Broker 的關聯關係,將 Bundle 重新分配。線上流量較大時應解除安裝 Bundle 而不是整個名稱空間,因為解除安裝後者會導致其上的全部 Bundle 與對應的生產者、消費者斷開,重新進行 lookup。


  • 利用 Bundle split 對流量較大的 Bundle 進行拆分,增加名稱空間的 Bundle 數量,降低影響。


總體而言,使用者需要注意流量的均衡與叢集的穩定性,在叢集管理之初就做好 Bundle 的數量管理和相關測試,謹慎對待大批量 Bundle 解除安裝等運維操作。


2.2  資料的管理


接下來我們從資料的儲存、過期、刪除三個方面來分析。


(1) Ledger 翻轉



首先介紹資料寫入 ledger 的過程。每一個 Topic 分割槽在一段時間內只建立一個 Ledger 維護分割槽寫入的 Entry 的資料歸屬。Topic 分割槽寫入的資料以 Entry 的形式,經過 Broker 寫入 Netty 執行緒處理佇列,執行緒依次根據 Entry 的 Ledger Id,對 Ledger 目錄數取模,寫入到目標磁碟 Ledger 目錄,最終以 Entry Log 和 RocksDB 的索引方式儲存。需要注意,Ledger 是一個分割槽在一段時間內寫入資料的邏輯管理單位,維護了這段資料儲存的 Bookie 位置。一個 Topic 分割槽在一段時間內寫入的資料只被一個活躍 Ledger 管理,待該 Ledger 達到翻轉條件後才會關閉 Ledger 並重新計算,建立新 Ledger 繼續寫入。

圖 7. Ledger 翻轉示意

Ledger 翻轉後,資料才會寫入新的資料目錄。在 Pulsar 中,在滿足 Ledger 最小翻轉時間以及以下條件之一後觸發 Ledger 翻轉:

  • 已達到 Ledger 最大翻轉時間;

  • 已達到 Ledger 的最大 Entry 數量;

  • 已達到 Ledger 的最大大小。


預設值:

觸發ledger翻轉的最小時間:managedLedgerMinLedgerRolloverTimeMinutes=10
觸發ledger翻轉的最長時間:managedLedgerMaxLedgerRolloverTimeMinutes=240
觸發ledger翻轉的最大entry數:managedLedgerMaxEntriesPerLedger=50000
觸發ledger翻轉的最大大小:managedLedgerMaxSizePerLedgerMbytes=2048

注意兩個問題:

  1. Ledger 過大:最小翻轉時間是防止 Ledger 元資料過快增長的手段,但實踐發現如果 Topic 分割槽流量較大,Ledger 的實際值可能遠超上述設定的上限閾值。Ledger 只有在翻轉後才會建立新的 Ledger,Ledger 過大會導致某段時間內寫入某個磁碟的資料過多,產生磁碟儲存不均衡的問題;針對 Ledger 為物件的一些操作也會受到影響,產生無法及時解除安裝資料到二級儲存、資料解除安裝時間較長、還未解除安裝成功但 Ledger 已經過期等問題。


  2. Ledger 間不均衡:Ledger ID 以叢集維度進行遞增。在分割槽的維度,按照 Ledger ID 對 Ledger 儲存目錄數進行取模的方式無法對多磁碟進行均衡寫入。但保持 Ledger 間的大小一致,在一定程度上會對多磁碟目錄的寫入均衡有比較大的改善。


總而言之,建議根據業務訊息情況適當調整 Ledger 翻轉引數和有針對性地增加大流量 Topic 分割槽數量,可以防止 Ledger 過大、大小不均衡的問題。


(2)資料過期


資料過期主要分為四個階段:


第一階段:未被 Ack 的訊息

Backlog 訊息:該段資料不會被刪除


第二階段:已經 Ack 的訊息

  • 訂閱主動 Ack 後,標記為非 backlog 訊息,有多個訂閱時以最慢的為準

  • TTL:若某 Topic 沒有活躍訂閱,超過 TTL 存活時間的訊息會被主動 Ack ,本質上是移動 cursor


第三階段:訊息保留時間檢查

Retention:對已經 Ack 的訊息的保留策略,按保留週期和保留大小設定來保留訊息。


第四階段:訊息刪除

Deleted:超過 Retenion 範圍的訊息則被刪除。超過 rentention 保留週期和保留大小的訊息,系統會從當前已經 ack 訊息的最新位置往前檢查並獲取已經過期的 ledger,將其標記刪除。


圖 8. 訊息保留時間檢查與訊息刪除

從上述的訊息階段演化來看,Pulsar 提供了較大的訊息管理空間,但也略顯複雜。建議叢集維護者建立簡單統一的規則處理資料保留策略,如可以設定 TTL = Retention 保留週期值。


(3) 資料刪除


此處介紹資料的物理刪除。Bookie 在處理資料寫入過程時,會將同一段時間內的資料經過排序 flush 到同一個 Entry Log 檔案中,將索引存放在 RocksDB 中。由於多個 Ledger 的資料可能會同時寫入同一個 Entry Log,因此 Entry Log 便不能被簡單直接的刪除。對此 BookKeeper 會啟動一個 GC(GarbageCollector)  執行緒進行檢查和物理刪除操作。

圖 9. 資料物理刪除流程

Entry Log 維護元資料資訊( EntryLogMetadata),該元資料記錄了 Ledger 列表、大小與剩餘有效資料比例。


GC 清理執行緒在每個 gcWaitTime 時間間隔:

  1. 掃描 Entry Log 的元資料資訊,對於已經沒有有效資料的 entry log 直接進行刪除。

  2. 判斷是否滿足 compaction 條件,滿足 compaction 條件後 GC 執行緒會讀取每一個 Entry 判斷其是否過期,一旦過期就會丟棄,否則會將資料寫入新的 Entry Log。


Compaction 分為 minorCompaction 和 majorCompaction,二者區別在於閾值。預設情況下,minorCompaction 清理間隔 1 小時,閾值 0.2;majorCompaction 清理間隔 24 小時,閾值 0.8。閾值是 Entry Log File 中的剩餘有效資料佔比。

minorCompactionInterval=3600minorCompactionThreshold=0.2majorCompactionThreshold=0.8majorCompactionInterval=86400


在實際使用中,如果機器節點的磁碟較小且資料遲遲得不到刪除,為了及時清除資料,應該按照業務流量和磁碟空間適當調整資料清理間隔時間、有效資料閾值,並配合 compaction 限速策略減小對叢集的影響。


三、Pulsar 監控實踐


vivo 的 Pulsar 指標監控鏈路架構如下:

圖 10. vivo 針對 Pulsar 監控指標搭建的監控架構

該架構中:

  • 採用 Prometheus 採集 Pulsar 指標;

  • 應用 Prometheus 遠端儲存特性將格式化後的指標傳送到 Kafka;

  • Druid 消費 Kafka 資料後可以作為 Grafana 的資料來源,配置 Grafana 面板查詢指標。


為什麼不使用 Prometheus 儲存資料?因為有些資料較久遠,一旦叢集規模增加,監控指標數量級會很大。Prometheus 對資源依賴重,我們只採用了它的採集能力。


下圖是常用的關鍵指標:

圖 11. 關鍵監控指標

指標型別分為:

  • 【客戶端指標】:用來排查客戶端出現的異常

  • 【Broker 端指標】:監控 topic 流量、調整 broker 間流量差距

  • 【Bookie 端指標】:排查讀寫延遲等問題


除了官方指標外,團隊還開發了 Bundle 相關的一些指標:

  • 分割槽數、流量等在 Bundle 的分佈

  • Broker 端記錄讀寫延遲的 P95/P99 值

  • 基於請求對列實現 Broker 端網路負載指標等。


四、問題優化與最佳實踐


4.1 負載均衡引數


負載均衡的目的是對資源平均分配,差異大會影響穩定性。對負載均衡設定的目標是節點流量偏差 20% 以內,每天均衡頻次在 10 次以內,否則客戶端會頻繁斷連、重連。優化後的引數如下:

# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedderloadBalancerLoadSheddingStrategy=org.apache.pulsar.Broker.loadbalance.impl.ThresholdShedder
# enable/disable namespace Bundle auto splitloadBalancerAutoBundleSplitEnabled=false
# enable/disable automatic unloading of split BundlesloadBalancerAutoUnloadSplitBundlesEnabled=false
#計算新資源使用量時的CPU使用權重(預設1.0)loadBalancerCPUResourceWeight=0.0
#計算新的資源使用量時的堆記憶體使用權重(預設1.0)loadBalancerMemoryResourceWeight=0.0
#計算新資源使用量時的直接記憶體使用權重(預設1.0)loadBalancerDirectMemoryResourceWeight=0.0


下面三個引數改為零,是因為叢集使用了相同的機型,團隊更關注流量均衡,對記憶體和 CPU 不是特別關注。


以一個具體產品案例來看,其中有 1 個 Topic、30 個分割槽、180 個 Bundle:

圖 12. 1 個 Topic、30 個分割槽、180 個 Bundle 的每秒入流量

上圖節點間流量差異較大,由 Bundle unload 導致。

圖 13. 1 個 Topic、30 個分割槽、180 個 Bundle 下,Bundle 上 Topic 分割槽情況

上圖可看出,有兩個 Bundle 分配了四個分割槽,遠超其他 Bundle。實踐中出現以下問題:

  • 均衡頻次高,一天大概有 200 多次

  • 客戶端連線頻繁切換,流量波動大

  • 每個 Bundle 的分割槽數量分佈差異大


圖 14. 1 個 Topic、30 個分割槽、180 個 Bundle 的入流量分佈

優化過程中,關鍵在於將分割槽打散到不同 Bundle 上,但分割槽數量太少很難做到。Topic 通過雜湊演算法分配到 Bundle 上在前文已經介紹。此案例中,問題在於分割槽數量少。


於是團隊將分割槽增加到 120 個,效果如下:

  • 節點間流量差異小

  • 均衡頻次降低,一天大概有 10 次左右

  • 客戶端連線切換減少,流量波動較小

  • 每個 bundle 的分割槽數量分佈差異降低


圖 15. 1 個 Topic、120 個分割槽、180 個 Bundle 的每秒入流量

圖 16. 1 個 Topic、120 個分割槽、180 個 Bundle 下,Bundle 上 Topic 分割槽情況

圖 17. 1 個 Topic、120 個分割槽、180 個 Bundle 的入流量分佈

4.2 客戶端傳送效能


在和上述業務相同的場景中,分割槽數量增加後,系統滾動重啟後出現了流量下降情況:

圖 18. 單個 Topic,30 個分割槽增加到 120 個,系統滾動重啟後流量下降

客戶端配置引數:


  • memoryLimitBytes=209715200 (預設為 0)

  • maxPendingMessages=2000  (預設 1000)

  • maxPendingMessagesAcrossPartitions=40000 (預設 50000)

  • batchingMaxPublishDelayMicros=50 (預設 1 毫秒)

  • batchingMaxMessages=2000 (預設 1000)

  • batchingMaxBytes=5242880 (預設 128KB)


滿足三個 batch 資料中的任何一個的情況下就會觸發打包、傳送。

圖 19. 重啟後 maxPendingMessages(佇列長度)出現下降

這裡 maxPendingMessages(佇列長度)

=min(maxPendingMessages,maxPendingMessagesAcrossPartitions/partitionNum) 。


而分割槽數新增(30 -> 120)後,需要重啟客戶端才對佇列長度生效。重啟後 maxPendingMessages 佇列長度 從 40000/30 = 1333 變為 40000/120 = 333,出現了明顯下降。


另外,測試發現 batchingMaxMessages 調小後效能提升 10 倍之多:

圖 20. 單個 Topic,30 個分割槽增加到 120 個,調整後效能提升

建議 batchingMaxPublishDelayMicros 不要過大,確保 batchingMaxMessages 比 maxPendingMessages 要大,否則等待 batchingMaxPublishDelayMicros 才會傳送。


4.3 宕機導致叢集流量驟降


某個分割槽佇列滿後會導致傳送執行緒阻塞,影響所有分割槽的整體傳送和叢集穩定性:

圖 21. 執行 Kill-9 一臺 Broker 後,其他 Broker 流量下降

圖 22. 第四個分割槽已滿,傳送執行緒阻塞在 canEnqueRequest 上,等待時間長,其他未滿分割槽的傳送也被影響。

圖 23. 極端情況下,第四個分割槽已滿,其他分割槽等待中。傳送執行緒會在第四個分割槽阻塞等待,其他執行緒無法傳送。

針對這一問題的優化思路,首先是能者多勞,讓傳送快的分割槽儘可能多傳送;然後是將阻塞點從 ProducerImpl 移到 PartitionedProducerImpl;如果分割槽 ProducerImpl 出現佇列已滿阻塞較長時間,就將該分割槽排除。

圖 24. 宕機導致叢集流量驟降優化思路

實踐中可分為可用 Producer 和不可用 Producer 兩個列表。在 ① 中,兩個列表都處於初始化狀態並可用;在 ② 中,某個可用分割槽阻塞一段時間後可以等待一段時間;若不可用就移動到不可用列表中,如 ③ 所示;當分割槽可用比例達到閾值再挪回可用列表,如 ④ 所示。


經過優化後,宕機 Broker 流量可以快速轉移到其他 Broker:

圖 25. 優化後 Broker 流量分流並上漲

注:優化只支援 RoundRobinPartitionMessageRouterImpl 路由策略。


在單個 ProducerImpl 對應的 Broker 出現處理慢、網路慢等導致傳送響應慢的情況,都可能會導致傳送執行緒阻塞,業務傳送訊息的速度受限於最慢的 ProducerImpl 的速度。


五、未來展望


本文分享了 vivo 在 Pulsar 叢集管理與監控的經驗,並介紹 vivo 在負載均衡等方面的最佳實踐。


由於服務端的問題很難通過監控指標進行分析,vivo 在未來計劃實現生產端到消費端的全鏈路監控能力。大資料團隊希望整合大資料元件,支撐 Flink、Spark、Druid 等核心下游元件打通落地。


同時,vivo 內部目前 Pulsar 與 Kafka 同時執行,團隊將嘗試基於 KoP 對存量 Kafka 萬億流量嘗試遷移,降低 Kafka 遷移成本;並探索容器化落地,充分發揮 Pulsar 雲原生優勢。


END

猜你喜歡

本文分享自微信公眾號 - vivo網際網路技術(vivoVMIC)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。