騰訊雲基於 Apache Pulsar 跨地域複製功能實現租户跨集羣遷移

語言: CN / TW / HK

導語

本文整理自 Pulsar Summit Asia 2022 技術峯會上騰訊雲中間件高級研發工程師韓明澤的分享《基於跨地域複製實現租户跨集羣遷移》。本文主要介紹基於跨地域數據複製和訂閲進度同步的實現及優化,以及騰訊雲在跨集羣遷移過程中遇到的問題及租户跨集羣遷移解決方案。

作者簡介

韓明澤

畢業於武漢大學,騰訊雲中間件高級研發工程師,擁有多年消息中間件開發與運維經驗,RoP (RocketMQ-on-Pulsar) Maintainer,Apache Pulsar 貢獻者。

訂閲進度同步的實現及優化

跨地域複製簡介

跨地域複製是 Apache Pulsar 提供的跨機房數據複製能力。其典型的使用場景有:

  • 多機房數據複製,即數據容災備份
  • 異地讀寫

下圖為典型的異地讀寫案例,假定在北京生產與寫入,而在上海消費,在對從生產到消費整體鏈路耗時要求不高的情況下,即可採用跨地域複製的能力。同地域寫入的時間成本相對較低,而比較耗時的消費可以在內部通過跨地域複製屏蔽。

圖片

跨地域複製集羣複製功能實現原理

如果 Apache Pulsar 不提供跨集羣複製功能,如何在運維 RocketMQ 或者 Kafka 等情況下實現跨地域數據複製、容災者備份和集羣間數據遷移的工作?

通常情況下,服務中有生產者和消費者兩個角色,消費者連接上游集羣,生產者連接下游集羣。上游集羣的消費數據通過生產者發送到下游目標集羣。Apache Pulsar 在跨地域複製的設計中採用了類似思路,跨地域複製實現的流程如下圖所示。

圖片

在每個主題內部設置了 Replication 模塊。如果開啟數據複製,此模塊則會發起內部訂閲(或遊標進度)。在任一主題內消費消息時,生產者向對端集羣投遞消息來實現跨集羣數據的複製功能,不影響本集羣的生產消費。在上述的過程中,消息的讀取與發送完全異步處理。

訂閲進度同步的實現原理

數據複製與同步的實現比較簡單,但在一些場景中,除了同步消息,還需要同步訂閲的消費進度。

以異地容災為例,假設原本業務的生產消費均在北京,當北京集羣業務出現故障時,業務端想快速將集羣切換到上海集羣,以繼續從北京集羣已經消費到的位置開始做生產和消費。

如果沒有訂閲進度同步的能力,那麼用户很難確定在北京集羣裏哪些消息已經消費過;如果從最新的位置開始消費,可能會導致消息丟失;如果從最早的位置開始消費,會造成大量的重複消費。在實際操作中,稍微折中的方法是通過時間回溯退回到較近的時間點。然而,這種方法無法從根本上解決消息丟失或者重複消費的問題。

而 Apache Pulsar 所提供的訂閲進度同步的功能,則可以讓用户平滑地完成異地容災的切換,不用擔心消息的丟失或者重複。Pulsar 同時支持數據同步和訂閲進度同步,如下圖所示。

圖片

消費進度

消費進度由 markDeletePosition 和 individuallyDeletedMessages 兩部分組成。在 RocketMQ 和 Kafka 中,消費進度在分區上通過 Offset 標識。Offset 對應 Pulsar 中的概念可以理解為 markDeletePosition。

Pulsar 同時支持多種消費模式,它的消息確認機制/簽收機制支持單條確認。因此,在 Pulsar 中除了需要記錄 markDeletePosition,還需要 Individual Acks 記錄單條被確認的消息。

圖片

如上圖,在共享消費模式下有很多消費者實例。由於每個消費者的消費速度不一樣,消息的推送順序和消息 Ack 順序並不完全相同。假定我們需要把標號為 0 到 9 的消息同時推送給不同的消費者實例,消息 0、1、2、3、4、6、9 已經確認,但是 5、7、8 並沒有確認。markDeletePosition 的遊標位置,即 Offset 標識的消費進度只能標識到 4 的位置,表示 4(包括 4)之前的消息都已經被消費。消息 5、7、8 已經被消費,需要單獨確認。Pulsar 通過 individuallyDeletedMessages 數組對象範圍去標識哪些消息已經被確認過。我們可以將上述消息的確認理解為幾個開閉區間,從中可以明顯得出 5、7、8 沒有被消費。

Message ID 對應關係

在 Pulsar 中,訂閲進度同步的複雜性在於同一條消息在不同集羣中的 Message ID 不一致,這也是 Pulsar 相較於 Kafka 和 Rocket MQ 而言比較複雜的地方。在 Kafka 分區裏只有 Offset 一個概念,而在 Pulsar 中,Message ID 由 Entry ID 和 Ledger ID 組成。同一條消息在不同集羣裏存儲的 Entry ID 和 Ledger ID 無法保持一致。

圖片

如上圖所示,在 A 和 B 兩個集羣中,消息 1 在集羣 A 中的 ID 是 1:0,而在集羣 B 中的 ID 是 3:0;消息 2 在集羣 A 中的 ID 是 1:1,在集羣 B 中的 ID 是 3:1。如果同一條消息在兩個集羣中的 ID 完全一致,同步消費進度非常容易,比如 ID 為 1:2 的消息在集羣 A 中被消費,集羣 B 同步確認消息 1:2 即可。但是由於 Message ID 不一致,或者在不知道 Message ID 間對應關係的情況下,沒有辦法直接將不同集羣間的消息對應起來。

所以關鍵問題就在於,如何知道 Message ID 間的對應關係?其實這也是最為複雜的地方,我們只有清楚集羣間同一 Message ID 的對應關係,才能在集羣 A 確認消息 1:2 之後,同步在集羣 B 確認消息 3:2,或者更新其 markDeletePosition。

構建 Cursor Snapshot

在原生 Pulsar 裏通過定期構造 Cursor Snapshot 的機制來實現 Message ID 間的彼此對應。

圖片

"ReplicatedSubscriptionSnapshotRequest":{
    "snapshot_id":"444D3632-F96C-48D7-83DB-041C32164EC1",
    "source_cluster":"a"
}

以上圖為例,集羣 A 確認消息 1:2 時通過快速構建 Snapshot 向集羣 B 和 C 發送請求,請求其告知集羣 A 此消息在其集羣中的位置信息。

集羣 A 會定期向其他集羣發送 Replicate Subscription Snapshot Request。集羣 B 在收到集羣 A 的請求之後會回發響應,將當前複製到的最新消息位置發送給集羣 A。

"ReplicatedSubscriptionSnapshotResponse":{
    "snapshotid":"444D3632-F96C-48D7-83DB-041C32164EC1",
    "cluster":{
        "cluster":"b",
        "message_id":{
            "ledger_id":1234,
            "entry_id":45678
            }
    }
}

集羣 A 收到對端集羣 B 和 C 返回的當前消息的位置後,就會構造起 Message ID 間對應關係。如下圖所示,集羣 A 中 Message ID 為 192.123123 的消息,在集羣 B 中對應 ID 為 1234.45678,在集羣 C 中對應 ID 為 7655.13421。

代碼如下:

{
    "snapshot_id":"44403632-F96C-48D7-83DB-041C32164EC1",
    "local_message_id":{
         "ledger_id":192,
         "endtry_id":123123
    },
    "clusters":[
        {
            "cluster":"b",
            "message_id":{
                "ledger_id":1234, 
        "endtry_id":45678
            }
        },
        {
            "cluster":"c",
            "message _id";{
                "ledger_id":7655,
        "endtry_id":13421
            }
        }
    ],
}

Message ID 間對應關係的構建並不複雜,但是實現邏輯會相對複雜。Cursor Snapshot 構造完成之後會形成一種對應關係作為 Cursor Snapshot Maker 寫入到原主題。

圖片

如上圖,阿拉伯數字表示業務主題裏的業務消息,字母 S 表示不同集羣間構造出來的 Cursor Snapshot 數據。當消費到 Snapshot Marker 時會把對應的 Snapshot Marker 加載到內存裏。比如,我們在集羣 A 中 Mark Delete 到消息 3 的位置時,可以根據 S 裏面記錄的集羣 B 中的消息位置來更新集羣 B 的 markDeletePosition。

圖片

以上圖為例,在集羣 A 中消息 1:2 和消息 1:6 的位置分別有一個 Snapshot,消息 1:1、1:3、1:4、1:5 和 1:7 是普通消息。當集羣 A 中 markDeletePosition 更新到 1:4 時,消息 1:2 在 1:4 之前並且有 Snapshot,就可以快速到集羣 B 去確認消息 3:4,並更新該位置的 markDeletePosition。

訂閲進度同步

訂閲進度同步過程中存在的問題

訂閲進度同步過程中存在的問題也是租户跨集羣遷移過程中卡點的問題:

  • 只同步 markDeletePosition,不同步 individuallyDeletedMessages。

這會導致在單條消息確認時存在很多消息確認空洞,對存在定時消息的場景也會產生較大的影響。假定一個主題裏有定時消息和普通消息,定時消息的時間是在一天後,也就意味着定時消息的確認時間需要延遲一天。由於 markDeletePosition 只能記錄此時已經被全部確認過的消息的位置,因此在定時消息被確認時,markDeletePosition 還是一天前的位置。如果用户此時切換集羣,就會造成消息重複消費,至少一天的消息會被重複消費。

  • 消息堆積會導致無法同步消費進度。這與 Cursor Snapshot 的創建機制有關。

如前文提到的,通過集羣 A 構建與集羣 B 和 C 之間的 Snapshot 時的請求並不是通過 RPC 接口發出的,而是藉由我們此前提到的 “S” 帶入。在訂閲進度或者消息同步的過程中,消息堆積不可避免,導致請求也被寫入到本地主題。由於對端消息堆積,且主題內部都會設置超時機制,如果在規定時間內收不到構建 Snapshot 的請求,Snapshot 就無法構建成功,進而無法同步訂閲進度,markDeletePosition 也無法同步。

  • 定期 Cursor Snapshot 機制。

沿用前面我們在講同步消費進度時所提到的案例,在集羣 A 和集羣 B 中,集羣 A 中消息 1:2 與消息 1:6 與集羣 B 之間有 Snapshot,普通消息間沒有 Snapshot。如果此時集羣 A 的 markDeletePosition 更新到 1:4,由於此位置上兩個集羣之間並不存在 Snapshot,所以集羣 A 無法確認該條消息在集羣 B 中對應消息位置,這也是當前機制中存在的問題。

綜上所述,訂閲進度同步過程中存在的問題主要在於只同步 markDeletePosition 而不同步 individuallyDeletedMessages,有時雖然同步了 markDeletePosition,但由於本身機制的問題會影響準確性或者出現消息堆積的情況。

訂閲進度同步優化

上述問題在租户遷移過程中會造成大量的重複消費,常見且難解。在一些真實用户在線業務場景中,少量、短暫且可控範圍內的重複消費可以接受,大量的重複消費不允許存在。

為了解決上面的問題,我們優化了訂閲進度同步的邏輯,在集羣遷移之前需要同步 markDeletePosition 和 individuallyDeletedMessages。在同步過程中,最大的問題仍然是同一消息在不同集羣中 Message ID 的對應。即使集羣 A 的 markDeletePosition 和 individuallyDeletedMessages 全部都同步到集羣 B,但是集羣 B 仍然無法確定 individuallyDeletedMessages 對應的本集羣的 Message ID。

為了解決這個問題,我們在原始集羣(集羣 A)發送消息到集羣 B 時,在消息的 Metadata 里加入了集羣 A 裏的 Entry Position(Message ID)和 originalClusterPosition 的屬性來攜帶消息寫入的位置。

圖片

這樣,當我們在集羣 B 進行消費時,可以快捷地從 originalClusterPosition 屬性中獲取到集羣 A 的 Message ID,將其與集羣 A 同步到集羣 B 的 individuallyDeletedMessages 進行比較。如果消息已經被確認過就直接跳過此條消息,不再發送給消費者。通過這樣的方法實現對已確認消息的過濾。

圖片

具體實現邏輯如上圖。在遷移集羣遷移之前,需要先將集羣 1 中 individuallyDeletedMessages 的訂閲同步到集羣 2。在將消息推送給消費者之前,消息會先經過 Filter Entries For Consumer 過濾掉集羣 1 中已經消費過的消息,將未消費的消息推送給集羣 2 中的消費者。

上述實現邏輯只是一種思路的轉換。因為在 Pulsar 中,進度同步實現在集羣 1 上,集羣 2 中的消息不斷同步到集羣 1,通過不斷構建 Snapshot 記錄集羣 1 和集羣 2 位置對應關係,這樣在集羣 1 確認消息時,可以同步確認集羣 2 對應位置。我們的優化方法是把集羣 1 中消息的位置信息放在消息裏,通過同步 individuallyDeletedMessages 和 markDeletePosition 將進度同步到集羣 2,在集羣 2 實際消費時過濾。通過這種方式將重複消費控制在用户可接受範圍內。

租户跨集羣遷移的實現

早期騰訊雲內部的集羣是共享集羣,不同業務場景的用户使用同一套物理集羣。有大規模消息隊列運維經驗的同學知道,不同用户混用同一集羣會使用户之間互相影響。用户對服務的要求不同,需要為對服務質量要求比較高的用户搭建獨佔集羣,物理資源隔離來減少對其他用户的影響。這時需要有平滑的遷移方案實現集羣的順利遷移。

租户遷移整體架構

圖片

上圖為騰訊雲內部實現租户跨集羣遷移的架構圖,其中最核心的模塊 Lookup Service 是騰訊雲內部代理客户端 Lookup 請求的服務模塊,保存每個租户到物理集羣的映射關係。我們根據租户將用户客户端 Lookup 請求轉發到對應的物理集羣,進而獲取用户客户端收發消息時所需要連接的 Broker 節點。需要注意的是,Lookup Service 不僅僅代理 Lookup 請求,還代理 getPartitionState、getPartitionMydata 和 getSchema 等請求,但不代理包含數據流的請求。數據流請求通過 CLB 或 VIP 直接連到集羣來收發消息,並不經過 Lookup Service。

其實 Lookup Service 不是為了跨集羣遷移而誕生,它的主要目的是在多種網絡接入訪問場景下,為雲上集羣提供集中處理不同網絡服務路由的能力。在公有云上不只存在通過簡單的 Broker IP 就能連接的內網用户,還需要通過公網 CLB、VPC 或 VIP 服務進行轉發,Lookup Service 主要應用於這方面。我們跨集羣遷移時利用了 Lookup Service 能力來保證集羣切換簡單順利地實現,同時藉助於跨地域複製的同步功能把數據從原有集羣遷移到目標集羣上。遷移完成後,通過 Lookup Service 的切入能力最終實現租户跨集羣遷移。

租户跨集羣遷移的主要流程

接下來介紹跨集羣遷移的具體流程。

圖片

  1. 同步元數據。在目標集羣上按照原集羣的租户、命名空間、主題和訂閲角色等資源,完成元數據同步。

  2. 開啟跨地域複製功能,遷移租户下的主題數據。

  3. 在集羣切換前開啟訂閲進度同步功能,把每個訂閲的 individuallyDeletedMessages 和 Mark Delete Messages 同步到目標集羣上。

  4. 修改 Lookup Service 中租户與物理集羣的對應關係,主動調用 Unload 觸發客户端重新尋址。Lookup Service 根據新租户到物理集羣的對應關係返回新物理集羣的地址。

  5. 在遷移完成後,清理原集羣上的資源。

總結

實現租户跨集羣遷移的方式有很多,本文只分享一種在公有云上實現改造成本較低、複雜程度較小並且可靠性較高的方案。這種方案不需要對現有 Pulsar 客户端和服務端協議做任何改動就可以實現平滑遷移。