Flink CDC核心元件 Debezium 支援 MySQL 的只讀增量快照!

語言: CN / TW / HK

Shopify 的工程團隊最近改進了 Debezium MySQL 聯結器,以便它支援資料庫的增量快照,而無需聯結器的寫入訪問,這是將 Debezium 指向只讀副本時所必需的。 此外,Debezium MySQL 聯結器現在還允許在增量快照期間更改架構。 這篇博文解釋了這些功能的實現細節。

為什麼是隻讀的?

在 Netflix 宣佈 他們的變更資料捕獲框架 之後, Debezium 在 1.6 版本中添加了 增量快照功能 在 Shopify, 我們使用 Debezium 進行變更資料捕獲 (CDC) ,我們期待成為早期採用者。 此外,我們希望有一個無寫入和無鎖的解決方案。

無寫入解決方案允許從只讀副本捕獲更改,並提供最高保證 CDC 不會導致資料庫端資料損壞。

過去,我們不得不協調快照與遷移,因為模式遷移封鎖影響了其他專案的開發。 解決方案是僅在週末執行快照,因此,我們嘗試儘可能少地進行快照。 我們也看到了改進這部分流程的機會。

這篇博文深入探討了只讀增量快照實現的技術細節,包括 MySQL 聯結器中增量快照期間的無鎖模式更改處理。

增量快照

Debezium 部落格文章中 的增量快照 詳細介紹了預設實現。 該演算法利用信令表來處理兩種型別的訊號:

  1. snapshot-window-open/snapshot-window-close 作為水印

  2. execute-snapshot 作為觸發增量快照的一種方式

對於只讀場景,我們需要用備選方案替換這兩種型別的訊號。

顯示高水印和低水印的主狀態

該解決方案特定於 MySQL,並依賴於 全域性事務識別符號 (GTID) 因此,如果您從只讀副本讀取,則 需要設定 gtid_mode 並配置資料庫以保留 GTID 順序。 ON

先決條件:

gtid_mode = ON
enforce_gtid_consistency = ON
if replica_parallel_workers > 0 set replica_preserve_commit_order = ON

該演算法執行 SHOW MASTER STATUS 查詢以獲取在塊選擇之前和之後設定的已執行 GTID:

low watermark = executed_gtid_set
high watermark = executed_gtid_set - low watermark

在只讀實現中,水印具有 GTID 集的形式,例如: 2174B383-5441-11E8-B90A-C80AA9429562:1-3, 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

這樣的水印不會出現在 binlog 流中。 相反,該演算法將每個事件的 GTID 與記憶體中的水印進行比較。 該實現確保沒有過時的讀取,並且塊僅具有不比事件不早於低水位線的更改。

具有隻讀水印的重複資料刪除演算法

在虛擬碼中,對從 binlog 讀取的事件和通過快照塊檢索的事件進行去重的演算法如下所示:

(1) pause log event processing
  (2) GtidSet lwGtidSet := executed_gtid_set from SHOW MASTER STATUS
  (3) chunk := select next chunk from table
  (4) GtidSet hwGtidSet := executed_gtid_set from SHOW MASTER STATUS subtracted by lwGtidSet
  (5) resume log event processing
      inwindow := false
      // other steps of event processing loop
      while true do
           e := next event from changelog
           append e to outputbuffer
           if not inwindow then
               if not lwGtidSet.contains(e.gtid) //reached the low watermark
                   inwindow := true
           else
               if hwGtidSet.contains(e.gtid) //haven't reached the high watermark yet
                   if chunk contains e.key then
                       remove e.key from chunk
               else //reached the high watermark
                   for each row in chunk do
                       append row to outputbuffer
           // other steps of event processing loop

水印檢查

一個數據庫事務可以更改幾行。 在這種情況下,多個 binlog 事件將具有相同的 GTID。 由於 GTID 不是唯一的,它會影響計算塊選擇視窗的邏輯。 當水印的 GTID 集不包含其 GTID 時,事件會更新視窗狀態。 在事務完成和心跳等事件之後,不會再有任何具有相同 GTID 的 binlog 事件。 對於這些事件,達到水印的上限就足以觸發視窗開啟/關閉。

圖 1. 塊選擇視窗

與預設實現一樣,重複資料刪除發生在塊選擇視窗內。 最後,演算法在高水位線之後插入一個去重塊:

圖 2. 塊重複資料刪除

包含的表沒有更新

接收二進位制日誌事件對於快照取得進展至關重要。 因此,該演算法會檢查 所有 事件的 GTID 以及未包含的表。

沒有二進位制日誌事件

MySQL 伺服器在複製連線空閒 x 秒後傳送心跳事件。 當二進位制日誌更新率較低時,只讀實現使用心跳。

心跳與最新的 binlog 事件具有相同的 GTID。 因此,對於一個心跳來說,達到高水位線的上限就足夠了。

該演算法使用 server_uuid 心跳的 GTID 的一部分從高水位線中獲取最大事務 ID。 該實現確保高水位線包含單個 server_uuid 未更改 server_uuid 允許避免由於心跳過早關閉視窗的情況。 以下圖為例:

圖 3. 視窗因心跳過早關閉的場景

不需要與低水位線進行心跳比較,因為視窗是否開啟並不重要。 這簡化了在高水位線和低水位線之間沒有新事件時的檢查。

水印之間沒有變化

當塊選擇期間沒有二進位制日誌事件時,二進位制日誌事件可以立即開啟和關閉一個視窗。 在這種情況下,高水印將是一個空集。 在這種情況下,快照塊會立即插入到低水位線之後,而不會進行重複資料刪除。

圖 4. 一個空的塊選擇視窗

基於 Kafka 主題的訊號

Debezium 支援通過插入訊號表觸發的臨時增量快照。 只讀的替代方法是通過特定的 Kafka 主題傳送訊號。 訊息的格式模仿信令表結構。 執行快照 Kafka 訊息包含引數

  • data-collections - 要捕獲的表列表

  • type - 設定為增量

例子:

Key: dbserver1
Value: {"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}

MySQL 聯結器的配置有一個新 signal.kafka.topic 屬性。 主題必須有一個分割槽和刪除保留策略。

一個單獨的執行緒從 Kafka 主題中檢索訊號訊息。 Kafka 訊息的 key 需要與 中設定的聯結器名稱匹配 database.server.name 聯結器將使用日誌條目跳過與聯結器名稱不對應的事件。 訊息鍵檢查允許為多個聯結器重用訊號主題。

當增量快照執行時,聯結器的偏移量包括增量快照上下文。 只讀實現將 Kafka 訊號偏移量新增到增量快照上下文中。 跟蹤偏移允許它在聯結器重新啟動時不會錯過或雙重處理訊號。

但是,不需要使用 Kafka 執行只讀增量快照, execute-snapshot 寫入訊號表的預設訊號也可以工作。 展望未來,還可以設想一個用於觸發臨時增量快照的 REST API,或者通過 Debezium Server 公開,或者作為部署到 Kafka Connect 的附加 REST 資源。

增量快照期間的架構更改

Debezium MySQL 聯結器 允許在增量快照期間更改模式 聯結器將在增量快照期間檢測架構更改並重新選擇當前塊以避免鎖定 DDL。

請注意,不支援對主鍵的更改,如果在增量快照期間執行,可能會導致不正確的結果。

歷史化的 Debezium 聯結器,例如 MySQL 解析資料定義語言 (DDL) 事件,例如 ALTER TABLE 來自 binlog 流。 聯結器保留每個表模式的記憶體表示,並使用這些模式來生成適當的更改事件。

增量快照實現兩次使用 binlog 模式:

  1. 在從資料庫中選擇塊的那一刻

  2. 在塊插入到 binlog 流的那一刻

塊的模式必須在兩個時間都匹配二進位制日誌模式。 讓我們詳細探討一下演算法是如何實現匹配模式的。

選擇時匹配塊和二進位制日誌模式

當增量快照查詢資料庫時,行具有表的最新架構。 如果 binlog 流落後,記憶體中的 schema 可能與最新的 schema 不同。 解決方法是等待聯結器在binlog流中接收到DDL事件。 之後,聯結器可以使用快取表的結構來生成正確的增量快照事件。

使用 JDBC API 選擇快照塊。 ResultSetMetaData 儲存塊的模式。 挑戰在於 ResultSetMetaData 中的 schema 和 binlog DDL 中的 schema 具有不同的格式,因此很難確定它們是否相同。

該演算法使用兩個步驟來獲得匹配的基於 ResultSet 和基於 DDL 的模式。 首先,聯結器在低水位線和高水位線之間查詢表的模式。 一旦聯結器檢測到視窗關閉,binlog 模式就會與 ResultSetMetaData 保持同步。 之後,聯結器查詢資料庫以驗證架構是否保持不變。 如果架構已更改,則聯結器將重複該過程。

該演算法將匹配的 ResultSet 和 binlog 模式儲存在記憶體中,以允許聯結器將每個塊的模式與快取的 ResultSet 模式進行比較。

當塊的架構與快取的 ResultSet 架構不匹配時,聯結器會丟棄選定的塊。 然後演算法重複匹配 ResultSet 和 binlog 模式的驗證過程。 之後,聯結器從資料庫中重新選擇相同的塊:

圖 5. Binlog 模式與塊選擇上的塊模式不匹配

插入時匹配塊和二進位制日誌模式

DDL 事件還會觸發受影響表的塊重新讀取。 重新讀取可防止當塊具有比 binlog 流具有的視窗關閉時更舊的架構時的情況。 例如,下圖說明了架構更改之前發生的塊選擇:

圖 6. Binlog 模式與塊插入時的塊模式不匹配

演示

我們將使用標準 教程部署 來演示只讀臨時增量快照。 我們使用 MySQL 作為源資料庫。 對於此演示,您將需要開啟多個終端視窗。

一開始我們將開始部署,建立信令 Kafka 主題,並啟動聯結器:

# Terminal 1 - start the deployment
# Start the deployment
export DEBEZIUM_VERSION=1.9
docker-compose -f docker-compose-mysql.yaml up

# Terminal 2

# Enable enforce_gtid_consistency and gtid_mode
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "SET GLOBAL enforce_gtid_consistency=ON; SET GLOBAL gtid_mode=OFF_PERMISSIVE; SET GLOBAL gtid_mode=ON_PERMISSIVE; SET GLOBAL gtid_mode=ON;"'

# Confirm the changes
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "show global variables like \"%GTID%\";"'

# Create a signaling topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
    --create \
    --bootstrap-server kafka:9092 \
    --partitions 1 \
    --replication-factor 1 \
    --topic dbz-signals

# Start MySQL connector, capture only customers table and enable signaling
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "table.include.list": "inventory.customers",
        "read.only": "true",
        "incremental.snapshot.allow.schema.changes": "true",
        "incremental.snapshot.chunk.size": "5000",
        "signal.kafka.topic": "dbz-signals",
        "signal.kafka.bootstrap.servers": "kafka:9092"
    }
}
EOF

從日誌中我們看到,根據 table.include.list 設定,只有一張表被快照, customers

教程-連線-1 | 2022-02-21 04:30:03,936 INFO MySQL|dbserver1|snapshot 在事務中快照 1 個表的內容 [io.debezium.relational.RelationalSnapshotChangeEventSource]

在下一步中,我們將模擬資料庫中的連續活動:

# Terminal 3
# Continuously consume messages from Debezium topic for customers table
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

# Terminal 4
# Modify records in the database via MySQL client
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'i=0; while true; do mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "INSERT INTO customers VALUES(default, \"name$i\", \"surname$i\", \"email$i\");"; ((i++)); done'

主題 dbserver1.inventory.customers 接收連續的訊息流。 現在將重新配置聯結器以捕獲 orders 表:

# 5 號航站樓
# 在抓包中新增訂單表
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http :// localhost : 8083 / connectors / inventory - connector / config [email protected]<<EOF
{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    “tasks.max”:“1”,
    "database.hostname": "mysql",
    "database.port": "3306",
    “database.user”:“debezium”,
    "database.password": "dbz",
    “database.server.id”:“184054”,
    "database.server.name": "dbserver1",
    "database.include.list": "庫存",
    “database.history.kafka.bootstrap.servers”:“kafka:9092”,
    “database.history.kafka.topic”:“schema-changes.inventory”,
    "table.include.list": "inventory.customers,inventory.orders",
    “只讀”:“真”,
    “incremental.snapshot.allow.schema.changes”:“真”,
    “incremental.snapshot.chunk.size”:“5000”,
    “signal.kafka.topic”:“dbz-訊號”,
    “signal.kafka.bootstrap.servers”:“kafka:9092”
}
EOF

正如預期的那樣,該表沒有訊息 orders

# Terminal 5
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders

現在讓我們通過傳送訊號來啟動增量臨時快照。 表的快照訊息 orders 被傳遞到 dbserver1.inventory.orders 主題。 該表的訊息 customers 將不間斷地傳遞。

# Terminal 5
# Send the signal
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-producer.sh \
--broker-list kafka:9092 \
--property "parse.key=true" \
--property "key.serializer=org.apache.kafka.common.serialization.StringSerializer" \
--property "value.serializer=custom.class.serialization.JsonSerializer" \
--property "key.separator=;" \
--topic dbz-signals
dbserver1;{"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}

# Check messages for orders table
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.orders

如果您在 orders 快照執行時修改表中的任何記錄,這將作為 read 事件或事件發出 update ,具體取決於確切的時間和順序。

作為最後一步,讓我們終止部署的系統並關閉所有終端:

# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down

結論

Debezium 是一個正在積極開發中的優秀變更資料捕獲工具,很高興成為其社群的一員。 我們很高興在 Shopify 的生產環境中使用增量快照。 如果您有類似的資料庫使用限制,請檢視只讀增量快照功能。 非常感謝我的團隊和 Debezium 團隊,沒有他們,這個專案就不會發生。