TiCDC 6.0原理之Sorter演進

語言: CN / TW / HK

作者: eastfisher 原文來源:https://tidb.net/blog/2c4c2a44

產生背景

TiCDC 是一款 TiDB 增量資料同步工具, 通過拉取上游 TiKV 的資料變更日誌, TiCDC 可以將資料解析為有序的行級變更資料輸出到下游. TiCDC 的典型應用場景包括資料庫災備, 資料整合等.

TiCDC 處理 TiDB 增量資料同步時, 需要經過 CDCKVClient 拉取 TiKV Change Log, Sorter 資料排序, Mounter 訊息格式轉換後經 Sink 傳送到下游資料來源的過程. 其中的 Sorter 模組對保證訊息有序性起著重要作用.

處理流程

TiCDC 的 CDC 任務的邏輯單元是 Changefeed, 使用者可以通過 cdc cli 或者 OpenAPI 向 TiCDC 提交 Changefeed 任務, TiCDC 叢集中的 Owner 會處理對 Changefeed 任務進行解析, 將其拆解為針對每張資料表的 TablePipeline 交給各個 Proessor 處理. Processor 內部會首先由 Puller 通過連線到 TiKV 叢集的 CDC Client 拉取 KV Change Log (RawKVEntry) 並根據 OpType 簡單轉換成 PolymorphicEvent, 交給 Sorter 介面進行排序, 排序完成後再由 Mounter 對訊息進行解析, 然後交給 Sink 傳送給下游資料來源.

tablepipeline.jpeg

Sorter 的排序實現邏輯被封裝在 EventSorter 介面中.

type EventSorter interface {
    Run(ctx context.Context) error
    // 輸入側, 供上游Actor (也就是Puller) 呼叫, 把無序CDC資料放入Sorter
    AddEntry(ctx context.Context, entry *model.PolymorphicEvent)
    TryAddEntry(ctx context.Context, entry *model.PolymorphicEvent) (bool, error)
    // 輸出側, 得到排好序的CDC資料
    Output() <-chan *model.PolymorphicEvent
}

Sorter模組演進

TiCDC 的 Sorter 模組經歷了多次演進, 從最初的基於記憶體的 Memory Sorter, 再發展到基於檔案的 Unified Sorter, 最終演進為目前 v6.0 版本基於 Key-Value 儲存的 DB Sorter.

Memory Sorter

Memory Sorter 用兩個 go slice 分別將未排序的資料變更事件和 Resolved 事件快取到記憶體中. 如果遇到 Resolved 事件, 則非同步發起一次 排序合併 操作.

排序操作使用 go 標準庫的 sort 中的快速排序演算法來實現, 排序規則定義在 ComparePolymorphicEvents 函式中, 按以下順序進行排序:

  1. Commited / Resolved TS 較小的排在前面

  2. Commited / Resolved TS 相同, 則:

    1. Resolved 事件排在最後

    2. Start TS 較小的排在前面

    3. Start TS 相同, DELETE 事件排在 PUT 事件前面

no-alt

排序完成後, 從 resolvedTsGroup 中取最後一個作為 maxResolvedTs, 然後開始執行 merge 操作. 將上一次排好序的事件與本次排好序的事件做二路歸併排序, 如果事件的 Commited / Resolved TS 小於 maxResolvedTs, 則直接傳送到下游, 剩餘事件重新快取到記憶體中, 等待下一個 Resolved TS 事件的到來.

no-alt

由於 Memory Sorter 完全使用記憶體來儲存等待排序的事件, 當上游出現大量資料寫入, 而此時如果下游寫入速度較慢, 導致 Memory Sorter 的 Output 環節出現訊息堆積時, 會導致資料在 Memory Sorter 的記憶體中堆積, 而在缺少 Back Pressure 機制的情況下, 容易引發 OOM. 此外, TiCDC的增量掃環節如果有大量 unresolved 資料堆積在 Memory Sorter, 也易引發OOM. 另一方面, Memory Sorter 是 table 級別的, 每個 Changefeed 中的每個 TablePipeline 都需要建立一個 Sorter 例項, 而 Sorter 內部又會開啟多個 goroutine 進行排序, 當表數量較多時, goroutine 數量也會成倍增多, 給 Go Runtime 排程帶來壓力.

Unified Sorter

Unified Sorter 的出現, 在一定程度上解決了 Memory Sorter 的問題. 該 Sorter 被稱為 Unified 的原因主要在於會在全域性層面對事件排序所需資源進行管理, 而 Memory Sorter 的資源粒度是 table 級別的.

Unified Sorter 在初始化時, 會開啟多個 heapSorter 例項 (預設4個), 並註冊到全域性的 heapSorterPool 中. Unified Sorter 在接收到上游傳送的 PolymorphicEvent 事件後, 會按訊息型別執行不同的分發策略. 對於 Resolved 型別事件, Unified Sorter會將該事件廣播到所有的 heapSorter 例項中. 而對 DELETE / PUT 事件, 則會以 round-robin 策略將訊息路由到對應的 heapSorter 例項.

heapSorter 例項藉助內部 heap 對事件進行排序 (排序規則與 Memory Sorter 相同), 當遇到 Resolved 事件或 heap 記憶體超過閾值時, 會執行一次 flush 操作, 把整個 heap dump出來. flush 操作由全域性單例 backEndPool 統一管理儲存資源, 並由全域性單例 heapSorterIOPool 統一管理計算資源.

no-alt

backEndPool 提供了基於記憶體的 memoryBackEnd 和基於檔案系統的 fileBackEnd 兩種儲存實現, 當記憶體空間足夠時, 優先使用 memoryBackEnd, 而當記憶體空間不足時, 會新建一個檔案, 使用該檔案作為 fileBackEnd 寫入排好序的事件訊息. 檔名的格式為: ${指定路徑名}/sort-pid-${counter}.tmp, 如 /data/sort-10501-1.tmp. 寫入完成後, 會將 flushTask 傳送至 Merger 等待進一步處理.

經過這一步操作, 事件經過記憶體 heap 的堆排序, 再刷出到記憶體或檔案, 形成一個個的靜態 heap (這裡沒有用持久化 heap 來表述). 在 merge 階段, Merger 會再建立一個記憶體 heap, 對當前有效的 flushTask 進行多路歸併排序後, 將事件訊息 Output 到下游.

no-alt

相比 Memory Sorter, Unified Sorter 解決了排序事件全部快取在記憶體中, 有可能引起OOM的問題, 但仍然存在計算資源與 Table 數量成線性關係的問題, 另外引入了一個問題, 即 fileBackEnd 檔案數與表數量成線性關係, 當同步表數量較多時, 會出現 too many open files 的問題.

DB Sorter

DB Sorter 在 v6.0 版本預設不開啟, 在 v6.1 版本才預設開啟, 相關配置項引數名稱為 enable-db-sorter . DB Sorter 底層使用了基於 LSM Tree 的 Key-Value 實現 PeppleDB, 並抽象出了類似 Level DB 的介面, 包括 DB, Batch, Iterator 這3個介面 db.go, 方便今後更換實現或進行測試. 幾個比較核心的操作包括 Put, Delete, Iterator, Compact等.

DB Sorter 採用新的 Actor 框架, 以事件驅動的方式執行整個資料排序處理流程. 關於 Actor 框架的更多設計可通過閱讀 actor doc 進行了解.

DB Sorter 由以下核心模組組成:

  • Sorter: 實現 EventSorter 介面, 作為連線 TablePipeline 與 Sorter Actor的橋樑, 是 Actor 的入口; 將事件 Output 到下游, 也是 Actor 的出口.
  • Writer: 解析 PolymorphicEvent, 進行key統一編碼後傳送給 DBActor.
  • DBActor: 將底層 DB 介面封裝成 Actor, 以事件驅動方式執行 KV 讀寫操作. 可通過配置指定數量, 預設16.
  • Reader: 讀取排好序的事件訊息, Output 到下游, 並把這些事件訊息從 DB 中刪除.
  • CompactActor: 將底層 DB 介面的 Compact 操作封裝成 Actor, 並由 CompactScheduler 統一排程.

以上模組中, Sorter, Writer, Reader 是每張表對應1個, DBActor, CompactActor是配置指定的固定數量, 預設16個.

與 Unified Sorter 類似, DB Sorter 也是全域性唯一的單例, System 在啟動時, 會預設建立 16 個 DB 例項和對應的 Compactor. 將N張表的 CDC 事件訊息對映到M個 DB 上, 並且 DB 只支援讀寫 Key-Value 資料, 因此需要對 Key 編碼做一定設計. DB Sorter 的 Key 編碼格式為:

20220627194955.png

採用這樣的 Key 編碼方式是與之前提到的事件排序規則密切相關, Commited / Resolved TS 在最前, Start TS其次, 最後是事件型別. 此外, 由於 DBActor 並不是每張表獨享的, 因此還需要為每張表劃分一個 namespace, Key 編碼的 unique ID 和 table ID 就唯一確定了當前 DBActor 中這張表對應的 namespace.

整個排序處理流程與 Unified Sorter 比較相似但略有不同, 主要區別在於, DB Sorter 會將同一張表的所有事件訊息路由到同一個 DB 例項上, 這樣就不再需要在Output之前進行多路歸併排序了.

DB Sorter 解決了排序資源使用與表數量成線性關係導致資源佔用大, 資源利用率不高的問題, 官方的效能測試驗證了十萬張表同步到下游可以穩定執行. 但是目前 DB Sorter 並沒有像 Unified Sorter 採用記憶體快取, 導致同步延遲有毫秒級的增加. 相信未來可採用 Unified Sorter 類似的實現機制解決該問題.

參考資料

db sorter design doc

sorter issue

5.3 sorter doc

TiDB 6.0 Book Rush 文章構思指南

TiCDC系列分享-02-剖析同步模型與基本架構