騰訊雲基於 Apache Pulsar 跨地域複製功能實現租戶跨叢集遷移

語言: CN / TW / HK

導語

本文整理自 Pulsar Summit Asia 2022 技術峰會上騰訊雲中間件高階研發工程師韓明澤的分享《基於跨地域複製實現租戶跨叢集遷移》。本文主要介紹基於跨地域資料複製和訂閱進度同步的實現及優化,以及騰訊雲在跨叢集遷移過程中遇到的問題及租戶跨叢集遷移解決方案。

作者簡介

韓明澤

畢業於武漢大學,騰訊雲中間件高階研發工程師,擁有多年訊息中介軟體開發與運維經驗,RoP (RocketMQ-on-Pulsar) Maintainer,Apache Pulsar 貢獻者。

訂閱進度同步的實現及優化

跨地域複製簡介

跨地域複製是 Apache Pulsar 提供的跨機房資料複製能力。其典型的使用場景有:

  • 多機房資料複製,即資料容災備份
  • 異地讀寫

下圖為典型的異地讀寫案例,假定在北京生產與寫入,而在上海消費,在對從生產到消費整體鏈路耗時要求不高的情況下,即可採用跨地域複製的能力。同地域寫入的時間成本相對較低,而比較耗時的消費可以在內部通過跨地域複製遮蔽。

圖片

跨地域複製叢集複製功能實現原理

如果 Apache Pulsar 不提供跨叢集複製功能,如何在運維 RocketMQ 或者 Kafka 等情況下實現跨地域資料複製、容災者備份和叢集間資料遷移的工作?

通常情況下,服務中有生產者和消費者兩個角色,消費者連線上游叢集,生產者連線下游叢集。上游叢集的消費資料通過生產者傳送到下游目標叢集。Apache Pulsar 在跨地域複製的設計中採用了類似思路,跨地域複製實現的流程如下圖所示。

圖片

在每個主題內部設定了 Replication 模組。如果開啟資料複製,此模組則會發起內部訂閱(或遊標進度)。在任一主題內消費訊息時,生產者向對端叢集投遞訊息來實現跨叢集資料的複製功能,不影響本叢集的生產消費。在上述的過程中,訊息的讀取與傳送完全非同步處理。

訂閱進度同步的實現原理

資料複製與同步的實現比較簡單,但在一些場景中,除了同步訊息,還需要同步訂閱的消費進度。

以異地容災為例,假設原本業務的生產消費均在北京,當北京叢集業務出現故障時,業務端想快速將叢集切換到上海叢集,以繼續從北京叢集已經消費到的位置開始做生產和消費。

如果沒有訂閱進度同步的能力,那麼使用者很難確定在北京叢集裡哪些訊息已經消費過;如果從最新的位置開始消費,可能會導致訊息丟失;如果從最早的位置開始消費,會造成大量的重複消費。在實際操作中,稍微折中的方法是通過時間回溯退回到較近的時間點。然而,這種方法無法從根本上解決訊息丟失或者重複消費的問題。

而 Apache Pulsar 所提供的訂閱進度同步的功能,則可以讓使用者平滑地完成異地容災的切換,不用擔心訊息的丟失或者重複。Pulsar 同時支援資料同步和訂閱進度同步,如下圖所示。

圖片

消費進度

消費進度由 markDeletePosition 和 individuallyDeletedMessages 兩部分組成。在 RocketMQ 和 Kafka 中,消費進度在分割槽上通過 Offset 標識。Offset 對應 Pulsar 中的概念可以理解為 markDeletePosition。

Pulsar 同時支援多種消費模式,它的訊息確認機制/簽收機制支援單條確認。因此,在 Pulsar 中除了需要記錄 markDeletePosition,還需要 Individual Acks 記錄單條被確認的訊息。

圖片

如上圖,在共享消費模式下有很多消費者例項。由於每個消費者的消費速度不一樣,訊息的推送順序和訊息 Ack 順序並不完全相同。假定我們需要把標號為 0 到 9 的訊息同時推送給不同的消費者例項,訊息 0、1、2、3、4、6、9 已經確認,但是 5、7、8 並沒有確認。markDeletePosition 的遊標位置,即 Offset 標識的消費進度只能標識到 4 的位置,表示 4(包括 4)之前的訊息都已經被消費。訊息 5、7、8 已經被消費,需要單獨確認。Pulsar 通過 individuallyDeletedMessages 陣列物件範圍去標識哪些訊息已經被確認過。我們可以將上述訊息的確認理解為幾個開閉區間,從中可以明顯得出 5、7、8 沒有被消費。

Message ID 對應關係

在 Pulsar 中,訂閱進度同步的複雜性在於同一條訊息在不同叢集中的 Message ID 不一致,這也是 Pulsar 相較於 Kafka 和 Rocket MQ 而言比較複雜的地方。在 Kafka 分割槽裡只有 Offset 一個概念,而在 Pulsar 中,Message ID 由 Entry ID 和 Ledger ID 組成。同一條訊息在不同叢集裡儲存的 Entry ID 和 Ledger ID 無法保持一致。

圖片

如上圖所示,在 A 和 B 兩個叢集中,訊息 1 在叢集 A 中的 ID 是 1:0,而在叢集 B 中的 ID 是 3:0;訊息 2 在叢集 A 中的 ID 是 1:1,在叢集 B 中的 ID 是 3:1。如果同一條訊息在兩個叢集中的 ID 完全一致,同步消費進度非常容易,比如 ID 為 1:2 的訊息在叢集 A 中被消費,叢集 B 同步確認訊息 1:2 即可。但是由於 Message ID 不一致,或者在不知道 Message ID 間對應關係的情況下,沒有辦法直接將不同叢集間的訊息對應起來。

所以關鍵問題就在於,如何知道 Message ID 間的對應關係?其實這也是最為複雜的地方,我們只有清楚叢集間同一 Message ID 的對應關係,才能在叢集 A 確認訊息 1:2 之後,同步在叢集 B 確認訊息 3:2,或者更新其 markDeletePosition。

構建 Cursor Snapshot

在原生 Pulsar 裡通過定期構造 Cursor Snapshot 的機制來實現 Message ID 間的彼此對應。

圖片

"ReplicatedSubscriptionSnapshotRequest":{
    "snapshot_id":"444D3632-F96C-48D7-83DB-041C32164EC1",
    "source_cluster":"a"
}

以上圖為例,叢集 A 確認訊息 1:2 時通過快速構建 Snapshot 向叢集 B 和 C 傳送請求,請求其告知叢集 A 此訊息在其叢集中的位置資訊。

叢集 A 會定期向其他叢集傳送 Replicate Subscription Snapshot Request。叢集 B 在收到叢集 A 的請求之後會回發響應,將當前複製到的最新訊息位置傳送給叢集 A。

"ReplicatedSubscriptionSnapshotResponse":{
    "snapshotid":"444D3632-F96C-48D7-83DB-041C32164EC1",
    "cluster":{
        "cluster":"b",
        "message_id":{
            "ledger_id":1234,
            "entry_id":45678
            }
    }
}

叢集 A 收到對端叢集 B 和 C 返回的當前訊息的位置後,就會構造起 Message ID 間對應關係。如下圖所示,叢集 A 中 Message ID 為 192.123123 的訊息,在叢集 B 中對應 ID 為 1234.45678,在叢集 C 中對應 ID 為 7655.13421。

程式碼如下:

{
    "snapshot_id":"44403632-F96C-48D7-83DB-041C32164EC1",
    "local_message_id":{
         "ledger_id":192,
         "endtry_id":123123
    },
    "clusters":[
        {
            "cluster":"b",
            "message_id":{
                "ledger_id":1234, 
        "endtry_id":45678
            }
        },
        {
            "cluster":"c",
            "message _id";{
                "ledger_id":7655,
        "endtry_id":13421
            }
        }
    ],
}

Message ID 間對應關係的構建並不複雜,但是實現邏輯會相對複雜。Cursor Snapshot 構造完成之後會形成一種對應關係作為 Cursor Snapshot Maker 寫入到原主題。

圖片

如上圖,阿拉伯數字表示業務主題裡的業務訊息,字母 S 表示不同叢集間構造出來的 Cursor Snapshot 資料。當消費到 Snapshot Marker 時會把對應的 Snapshot Marker 載入到記憶體裡。比如,我們在叢集 A 中 Mark Delete 到訊息 3 的位置時,可以根據 S 裡面記錄的叢集 B 中的訊息位置來更新叢集 B 的 markDeletePosition。

圖片

以上圖為例,在叢集 A 中訊息 1:2 和訊息 1:6 的位置分別有一個 Snapshot,訊息 1:1、1:3、1:4、1:5 和 1:7 是普通訊息。當叢集 A 中 markDeletePosition 更新到 1:4 時,訊息 1:2 在 1:4 之前並且有 Snapshot,就可以快速到叢集 B 去確認訊息 3:4,並更新該位置的 markDeletePosition。

訂閱進度同步

訂閱進度同步過程中存在的問題

訂閱進度同步過程中存在的問題也是租戶跨叢集遷移過程中卡點的問題:

  • 只同步 markDeletePosition,不同步 individuallyDeletedMessages。

這會導致在單條訊息確認時存在很多訊息確認空洞,對存在定時訊息的場景也會產生較大的影響。假定一個主題裡有定時訊息和普通訊息,定時訊息的時間是在一天後,也就意味著定時訊息的確認時間需要延遲一天。由於 markDeletePosition 只能記錄此時已經被全部確認過的訊息的位置,因此在定時訊息被確認時,markDeletePosition 還是一天前的位置。如果使用者此時切換叢集,就會造成訊息重複消費,至少一天的訊息會被重複消費。

  • 訊息堆積會導致無法同步消費進度。這與 Cursor Snapshot 的建立機制有關。

如前文提到的,通過叢集 A 構建與叢集 B 和 C 之間的 Snapshot 時的請求並不是通過 RPC 介面發出的,而是藉由我們此前提到的 “S” 帶入。在訂閱進度或者訊息同步的過程中,訊息堆積不可避免,導致請求也被寫入到本地主題。由於對端訊息堆積,且主題內部都會設定超時機制,如果在規定時間內收不到構建 Snapshot 的請求,Snapshot 就無法構建成功,進而無法同步訂閱進度,markDeletePosition 也無法同步。

  • 定期 Cursor Snapshot 機制。

沿用前面我們在講同步消費進度時所提到的案例,在叢集 A 和叢集 B 中,叢集 A 中訊息 1:2 與訊息 1:6 與叢集 B 之間有 Snapshot,普通訊息間沒有 Snapshot。如果此時叢集 A 的 markDeletePosition 更新到 1:4,由於此位置上兩個叢集之間並不存在 Snapshot,所以叢集 A 無法確認該條訊息在叢集 B 中對應訊息位置,這也是當前機制中存在的問題。

綜上所述,訂閱進度同步過程中存在的問題主要在於只同步 markDeletePosition 而不同步 individuallyDeletedMessages,有時雖然同步了 markDeletePosition,但由於本身機制的問題會影響準確性或者出現訊息堆積的情況。

訂閱進度同步優化

上述問題在租戶遷移過程中會造成大量的重複消費,常見且難解。在一些真實使用者線上業務場景中,少量、短暫且可控範圍內的重複消費可以接受,大量的重複消費不允許存在。

為了解決上面的問題,我們優化了訂閱進度同步的邏輯,在叢集遷移之前需要同步 markDeletePosition 和 individuallyDeletedMessages。在同步過程中,最大的問題仍然是同一訊息在不同叢集中 Message ID 的對應。即使叢集 A 的 markDeletePosition 和 individuallyDeletedMessages 全部都同步到叢集 B,但是叢集 B 仍然無法確定 individuallyDeletedMessages 對應的本叢集的 Message ID。

為了解決這個問題,我們在原始叢集(叢集 A)傳送訊息到叢集 B 時,在訊息的 Metadata 里加入了叢集 A 裡的 Entry Position(Message ID)和 originalClusterPosition 的屬性來攜帶訊息寫入的位置。

圖片

這樣,當我們在叢集 B 進行消費時,可以快捷地從 originalClusterPosition 屬性中獲取到叢集 A 的 Message ID,將其與叢集 A 同步到叢集 B 的 individuallyDeletedMessages 進行比較。如果訊息已經被確認過就直接跳過此條訊息,不再發送給消費者。通過這樣的方法實現對已確認訊息的過濾。

圖片

具體實現邏輯如上圖。在遷移叢集遷移之前,需要先將叢集 1 中 individuallyDeletedMessages 的訂閱同步到叢集 2。在將訊息推送給消費者之前,訊息會先經過 Filter Entries For Consumer 過濾掉叢集 1 中已經消費過的訊息,將未消費的訊息推送給叢集 2 中的消費者。

上述實現邏輯只是一種思路的轉換。因為在 Pulsar 中,進度同步實現在叢集 1 上,叢集 2 中的訊息不斷同步到叢集 1,通過不斷構建 Snapshot 記錄叢集 1 和叢集 2 位置對應關係,這樣在叢集 1 確認訊息時,可以同步確認叢集 2 對應位置。我們的優化方法是把叢集 1 中訊息的位置資訊放在訊息裡,通過同步 individuallyDeletedMessages 和 markDeletePosition 將進度同步到叢集 2,在叢集 2 實際消費時過濾。通過這種方式將重複消費控制在使用者可接受範圍內。

租戶跨叢集遷移的實現

早期騰訊雲內部的叢集是共享叢集,不同業務場景的使用者使用同一套物理叢集。有大規模訊息佇列運維經驗的同學知道,不同使用者混用同一叢集會使使用者之間互相影響。使用者對服務的要求不同,需要為對服務質量要求比較高的使用者搭建獨佔叢集,物理資源隔離來減少對其他使用者的影響。這時需要有平滑的遷移方案實現叢集的順利遷移。

租戶遷移整體架構

圖片

上圖為騰訊雲內部實現租戶跨叢集遷移的架構圖,其中最核心的模組 Lookup Service 是騰訊雲內部代理客戶端 Lookup 請求的服務模組,儲存每個租戶到物理叢集的對映關係。我們根據租戶將使用者客戶端 Lookup 請求轉發到對應的物理叢集,進而獲取使用者客戶端收發訊息時所需要連線的 Broker 節點。需要注意的是,Lookup Service 不僅僅代理 Lookup 請求,還代理 getPartitionState、getPartitionMydata 和 getSchema 等請求,但不代理包含資料流的請求。資料流請求通過 CLB 或 VIP 直接連到叢集來收發訊息,並不經過 Lookup Service。

其實 Lookup Service 不是為了跨叢集遷移而誕生,它的主要目的是在多種網路接入訪問場景下,為雲上叢集提供集中處理不同網路服務路由的能力。在公有云上不只存在通過簡單的 Broker IP 就能連線的內網使用者,還需要通過公網 CLB、VPC 或 VIP 服務進行轉發,Lookup Service 主要應用於這方面。我們跨叢集遷移時利用了 Lookup Service 能力來保證叢集切換簡單順利地實現,同時藉助於跨地域複製的同步功能把資料從原有叢集遷移到目標叢集上。遷移完成後,通過 Lookup Service 的切入能力最終實現租戶跨叢集遷移。

租戶跨叢集遷移的主要流程

接下來介紹跨叢集遷移的具體流程。

圖片

  1. 同步元資料。在目標叢集上按照原叢集的租戶、名稱空間、主題和訂閱角色等資源,完成元資料同步。

  2. 開啟跨地域複製功能,遷移租戶下的主題資料。

  3. 在叢集切換前開啟訂閱進度同步功能,把每個訂閱的 individuallyDeletedMessages 和 Mark Delete Messages 同步到目標叢集上。

  4. 修改 Lookup Service 中租戶與物理叢集的對應關係,主動呼叫 Unload 觸發客戶端重新定址。Lookup Service 根據新租戶到物理叢集的對應關係返回新物理叢集的地址。

  5. 在遷移完成後,清理原叢集上的資源。

總結

實現租戶跨叢集遷移的方式有很多,本文只分享一種在公有云上實現改造成本較低、複雜程度較小並且可靠性較高的方案。這種方案不需要對現有 Pulsar 客戶端和服務端協議做任何改動就可以實現平滑遷移。