Flink CDC核心元件 Debezium 支援 MySQL 的只讀增量快照!
Shopify 的工程團隊最近改進了 Debezium MySQL 聯結器,以便它支援資料庫的增量快照,而無需聯結器的寫入訪問,這是將 Debezium 指向只讀副本時所必需的。 此外,Debezium MySQL 聯結器現在還允許在增量快照期間更改架構。 這篇博文解釋了這些功能的實現細節。
為什麼是隻讀的?
在 Netflix 宣佈 他們的變更資料捕獲框架 之後, Debezium 在 1.6 版本中添加了 增量快照功能 。 在 Shopify, 我們使用 Debezium 進行變更資料捕獲 (CDC) ,我們期待成為早期採用者。 此外,我們希望有一個無寫入和無鎖的解決方案。
無寫入解決方案允許從只讀副本捕獲更改,並提供最高保證 CDC 不會導致資料庫端資料損壞。
過去,我們不得不協調快照與遷移,因為模式遷移封鎖影響了其他專案的開發。 解決方案是僅在週末執行快照,因此,我們嘗試儘可能少地進行快照。 我們也看到了改進這部分流程的機會。
這篇博文深入探討了只讀增量快照實現的技術細節,包括 MySQL 聯結器中增量快照期間的無鎖模式更改處理。
增量快照
Debezium 部落格文章中 的增量快照 詳細介紹了預設實現。 該演算法利用信令表來處理兩種型別的訊號:
-
snapshot-window-open/snapshot-window-close
作為水印 -
execute-snapshot
作為觸發增量快照的一種方式
對於只讀場景,我們需要用備選方案替換這兩種型別的訊號。
顯示高水印和低水印的主狀態
該解決方案特定於 MySQL,並依賴於 全域性事務識別符號 (GTID) 。 因此,如果您從只讀副本讀取,則 需要設定 gtid_mode
並配置資料庫以保留 GTID 順序。 ON
先決條件:
gtid_mode = ON enforce_gtid_consistency = ON if replica_parallel_workers > 0 set replica_preserve_commit_order = ON
該演算法執行 SHOW MASTER STATUS 查詢以獲取在塊選擇之前和之後設定的已執行 GTID:
low watermark = executed_gtid_set high watermark = executed_gtid_set - low watermark
在只讀實現中,水印具有 GTID 集的形式,例如: 2174B383-5441-11E8-B90A-C80AA9429562:1-3, 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19
這樣的水印不會出現在 binlog 流中。 相反,該演算法將每個事件的 GTID 與記憶體中的水印進行比較。 該實現確保沒有過時的讀取,並且塊僅具有不比事件不早於低水位線的更改。
具有隻讀水印的重複資料刪除演算法
在虛擬碼中,對從 binlog 讀取的事件和通過快照塊檢索的事件進行去重的演算法如下所示:
(1) pause log event processing (2) GtidSet lwGtidSet := executed_gtid_set from SHOW MASTER STATUS (3) chunk := select next chunk from table (4) GtidSet hwGtidSet := executed_gtid_set from SHOW MASTER STATUS subtracted by lwGtidSet (5) resume log event processing inwindow := false // other steps of event processing loop while true do e := next event from changelog append e to outputbuffer if not inwindow then if not lwGtidSet.contains(e.gtid) //reached the low watermark inwindow := true else if hwGtidSet.contains(e.gtid) //haven't reached the high watermark yet if chunk contains e.key then remove e.key from chunk else //reached the high watermark for each row in chunk do append row to outputbuffer // other steps of event processing loop
水印檢查
一個數據庫事務可以更改幾行。 在這種情況下,多個 binlog 事件將具有相同的 GTID。 由於 GTID 不是唯一的,它會影響計算塊選擇視窗的邏輯。 當水印的 GTID 集不包含其 GTID 時,事件會更新視窗狀態。 在事務完成和心跳等事件之後,不會再有任何具有相同 GTID 的 binlog 事件。 對於這些事件,達到水印的上限就足以觸發視窗開啟/關閉。
圖 1. 塊選擇視窗
與預設實現一樣,重複資料刪除發生在塊選擇視窗內。 最後,演算法在高水位線之後插入一個去重塊:
圖 2. 塊重複資料刪除
包含的表沒有更新
接收二進位制日誌事件對於快照取得進展至關重要。 因此,該演算法會檢查 所有 事件的 GTID 以及未包含的表。
沒有二進位制日誌事件
MySQL 伺服器在複製連線空閒 x 秒後傳送心跳事件。 當二進位制日誌更新率較低時,只讀實現使用心跳。
心跳與最新的 binlog 事件具有相同的 GTID。 因此,對於一個心跳來說,達到高水位線的上限就足夠了。
該演算法使用 server_uuid
心跳的 GTID 的一部分從高水位線中獲取最大事務 ID。 該實現確保高水位線包含單個 server_uuid
. 未更改 server_uuid
允許避免由於心跳過早關閉視窗的情況。 以下圖為例:
圖 3. 視窗因心跳過早關閉的場景
不需要與低水位線進行心跳比較,因為視窗是否開啟並不重要。 這簡化了在高水位線和低水位線之間沒有新事件時的檢查。
水印之間沒有變化
當塊選擇期間沒有二進位制日誌事件時,二進位制日誌事件可以立即開啟和關閉一個視窗。 在這種情況下,高水印將是一個空集。 在這種情況下,快照塊會立即插入到低水位線之後,而不會進行重複資料刪除。
圖 4. 一個空的塊選擇視窗
基於 Kafka 主題的訊號
Debezium 支援通過插入訊號表觸發的臨時增量快照。 只讀的替代方法是通過特定的 Kafka 主題傳送訊號。 訊息的格式模仿信令表結構。 執行快照 Kafka 訊息包含引數
-
data-collections
- 要捕獲的表列表 -
type
- 設定為增量
例子:
Key: dbserver1 Value: {"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}
MySQL 聯結器的配置有一個新 signal.kafka.topic
屬性。 主題必須有一個分割槽和刪除保留策略。
一個單獨的執行緒從 Kafka 主題中檢索訊號訊息。 Kafka 訊息的 key 需要與 中設定的聯結器名稱匹配 database.server.name
。 聯結器將使用日誌條目跳過與聯結器名稱不對應的事件。 訊息鍵檢查允許為多個聯結器重用訊號主題。
當增量快照執行時,聯結器的偏移量包括增量快照上下文。 只讀實現將 Kafka 訊號偏移量新增到增量快照上下文中。 跟蹤偏移允許它在聯結器重新啟動時不會錯過或雙重處理訊號。
但是,不需要使用 Kafka 執行只讀增量快照, execute-snapshot
寫入訊號表的預設訊號也可以工作。 展望未來,還可以設想一個用於觸發臨時增量快照的 REST API,或者通過 Debezium Server 公開,或者作為部署到 Kafka Connect 的附加 REST 資源。
增量快照期間的架構更改
Debezium MySQL 聯結器 允許在增量快照期間更改模式 。 聯結器將在增量快照期間檢測架構更改並重新選擇當前塊以避免鎖定 DDL。
請注意,不支援對主鍵的更改,如果在增量快照期間執行,可能會導致不正確的結果。
歷史化的 Debezium 聯結器,例如 MySQL 解析資料定義語言 (DDL) 事件,例如 ALTER TABLE
來自 binlog 流。 聯結器保留每個表模式的記憶體表示,並使用這些模式來生成適當的更改事件。
增量快照實現兩次使用 binlog 模式:
-
在從資料庫中選擇塊的那一刻
-
在塊插入到 binlog 流的那一刻
塊的模式必須在兩個時間都匹配二進位制日誌模式。 讓我們詳細探討一下演算法是如何實現匹配模式的。
選擇時匹配塊和二進位制日誌模式
當增量快照查詢資料庫時,行具有表的最新架構。 如果 binlog 流落後,記憶體中的 schema 可能與最新的 schema 不同。 解決方法是等待聯結器在binlog流中接收到DDL事件。 之後,聯結器可以使用快取表的結構來生成正確的增量快照事件。
使用 JDBC API 選擇快照塊。 ResultSetMetaData 儲存塊的模式。 挑戰在於 ResultSetMetaData 中的 schema 和 binlog DDL 中的 schema 具有不同的格式,因此很難確定它們是否相同。
該演算法使用兩個步驟來獲得匹配的基於 ResultSet 和基於 DDL 的模式。 首先,聯結器在低水位線和高水位線之間查詢表的模式。 一旦聯結器檢測到視窗關閉,binlog 模式就會與 ResultSetMetaData 保持同步。 之後,聯結器查詢資料庫以驗證架構是否保持不變。 如果架構已更改,則聯結器將重複該過程。
該演算法將匹配的 ResultSet 和 binlog 模式儲存在記憶體中,以允許聯結器將每個塊的模式與快取的 ResultSet 模式進行比較。
當塊的架構與快取的 ResultSet 架構不匹配時,聯結器會丟棄選定的塊。 然後演算法重複匹配 ResultSet 和 binlog 模式的驗證過程。 之後,聯結器從資料庫中重新選擇相同的塊:
圖 5. Binlog 模式與塊選擇上的塊模式不匹配
插入時匹配塊和二進位制日誌模式
DDL 事件還會觸發受影響表的塊重新讀取。 重新讀取可防止當塊具有比 binlog 流具有的視窗關閉時更舊的架構時的情況。 例如,下圖說明了架構更改之前發生的塊選擇:
圖 6. Binlog 模式與塊插入時的塊模式不匹配
演示
我們將使用標準 教程部署 來演示只讀臨時增量快照。 我們使用 MySQL 作為源資料庫。 對於此演示,您將需要開啟多個終端視窗。
一開始我們將開始部署,建立信令 Kafka 主題,並啟動聯結器:
# Terminal 1 - start the deployment # Start the deployment export DEBEZIUM_VERSION=1.9 docker-compose -f docker-compose-mysql.yaml up # Terminal 2 # Enable enforce_gtid_consistency and gtid_mode docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "SET GLOBAL enforce_gtid_consistency=ON; SET GLOBAL gtid_mode=OFF_PERMISSIVE; SET GLOBAL gtid_mode=ON_PERMISSIVE; SET GLOBAL gtid_mode=ON;"' # Confirm the changes docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "show global variables like \"%GTID%\";"' # Create a signaling topic docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \ --create \ --bootstrap-server kafka:9092 \ --partitions 1 \ --replication-factor 1 \ --topic dbz-signals # Start MySQL connector, capture only customers table and enable signaling curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF { "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory", "table.include.list": "inventory.customers", "read.only": "true", "incremental.snapshot.allow.schema.changes": "true", "incremental.snapshot.chunk.size": "5000", "signal.kafka.topic": "dbz-signals", "signal.kafka.bootstrap.servers": "kafka:9092" } } EOF
從日誌中我們看到,根據 table.include.list
設定,只有一張表被快照, customers
:
教程-連線-1 | 2022-02-21 04:30:03,936 INFO MySQL|dbserver1|snapshot 在事務中快照 1 個表的內容 [io.debezium.relational.RelationalSnapshotChangeEventSource]
在下一步中,我們將模擬資料庫中的連續活動:
# Terminal 3 # Continuously consume messages from Debezium topic for customers table docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers # Terminal 4 # Modify records in the database via MySQL client docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'i=0; while true; do mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "INSERT INTO customers VALUES(default, \"name$i\", \"surname$i\", \"email$i\");"; ((i++)); done'
主題 dbserver1.inventory.customers
接收連續的訊息流。 現在將重新配置聯結器以捕獲 orders
表:
# 5 號航站樓 # 在抓包中新增訂單表 curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http :// localhost : 8083 / connectors / inventory - connector / config [email protected]<<EOF { "connector.class": "io.debezium.connector.mysql.MySqlConnector", “tasks.max”:“1”, "database.hostname": "mysql", "database.port": "3306", “database.user”:“debezium”, "database.password": "dbz", “database.server.id”:“184054”, "database.server.name": "dbserver1", "database.include.list": "庫存", “database.history.kafka.bootstrap.servers”:“kafka:9092”, “database.history.kafka.topic”:“schema-changes.inventory”, "table.include.list": "inventory.customers,inventory.orders", “只讀”:“真”, “incremental.snapshot.allow.schema.changes”:“真”, “incremental.snapshot.chunk.size”:“5000”, “signal.kafka.topic”:“dbz-訊號”, “signal.kafka.bootstrap.servers”:“kafka:9092” } EOF
正如預期的那樣,該表沒有訊息 orders
:
# Terminal 5 docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.orders
現在讓我們通過傳送訊號來啟動增量臨時快照。 表的快照訊息 orders
被傳遞到 dbserver1.inventory.orders
主題。 該表的訊息 customers
將不間斷地傳遞。
# Terminal 5 # Send the signal docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-producer.sh \ --broker-list kafka:9092 \ --property "parse.key=true" \ --property "key.serializer=org.apache.kafka.common.serialization.StringSerializer" \ --property "value.serializer=custom.class.serialization.JsonSerializer" \ --property "key.separator=;" \ --topic dbz-signals dbserver1;{"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}} # Check messages for orders table docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.orders
如果您在 orders
快照執行時修改表中的任何記錄,這將作為 read
事件或事件發出 update
,具體取決於確切的時間和順序。
作為最後一步,讓我們終止部署的系統並關閉所有終端:
# Shut down the cluster docker-compose -f docker-compose-mysql.yaml down
結論
Debezium 是一個正在積極開發中的優秀變更資料捕獲工具,很高興成為其社群的一員。 我們很高興在 Shopify 的生產環境中使用增量快照。 如果您有類似的資料庫使用限制,請檢視只讀增量快照功能。 非常感謝我的團隊和 Debezium 團隊,沒有他們,這個專案就不會發生。
- Flink資源排程模型
- 資料庫主鍵一定要自增嗎?有哪些場景不建議自增?位元組跳動使用 Flink State 的經驗分享| 醬醬的下午茶第 16 期
- 美團基於 Flink 的實時數倉平臺建設新進展
- Flink流處理API大合集:掌握所有flink流處理技術,看這一篇就夠了
- 應用實踐 | 海量資料,秒級分析!Flink Doris 構建實時數倉方案
- 應用實踐 | Apache Doris 整合 Iceberg Flink CDC 構建實時湖倉一體的聯邦查詢分析架構
- Flink的狀態管理
- 位元組跳動 Flink 單點恢復功能及 Regional CheckPoint 優化實踐
- 為了讓vite打包更順暢,我開發了這個vite外掛,音影片小白如何實現一個錄音/播放器,關於Flink框架視窗(window)函式最全解析 | 年中總結
- 美團基於 Flink 的實時數倉平臺建設新進展
- 位元組跳動使用 Flink State 的經驗分享
- 關於Flink框架視窗(window)函式最全解析
- 挑戰最全 Apache Doris 學習資料,你想要的都在這裡了!
- 自適應批作業排程器:為 Flink 批作業自動推導並行度
- 位元組跳動使用 Flink State 的經驗分享
- 位元組跳動的 Flink OLAP 作業排程和查詢執行優化實踐
- 錢大媽基於 Flink 的實時風控實踐
- 實時開發平臺建設實踐,深入釋放實時資料價值丨04期直播回顧
- Flink CDC OceanBase 全增量一體化資料整合方案
- No.0 - 流計算產品綜合洞察@以終為始