實戰 kafka connector 與 debezium mysql

語言: CN / TW / HK

本文已參與「新人創作禮」活動,一起開啟掘金創作之路。

kafka connector

Kafka Connect 是一個可擴充套件、可靠的在Kafka和其他系統之間流傳輸的資料工具。它可以通過connectors(聯結器)簡單、快速的將大集合資料匯入和匯出kafka。

1. 安裝配置

1.1 下載並配置kafka

我們使用的是kafka_2.13-2.8.0.tgz 您可以自行下載

解壓到目錄 D:\soft\kafka_2.13-2.8.0 這是我的根目錄

修改 D:\soft\kafka_2.13-2.8.0\config 資料夾以下檔案內容

zookeeper.properties 需要修改的地方:

textile dataDir=D:/soft/kafka_2.13-2.8.0/data/zookeeper

server.properties 需要修改的地方:

txt log.dirs=D:/soft/kafka_2.13-2.8.0/data/kafka-logs

在D:\soft\kafka_2.13-2.8.0\bin\windows 下我們新建一個 a.bat 檔案內容如下

batch start zookeeper-server-start.bat ../../config/zookeeper.properties rem -E 如果sleep命令不可用可以用 ping -n 10 127.0.0.1>nul sleep 5 start kafka-server-start.bat ../../config/server.properties

這個命令會先啟動 zookeeper 等zookeeper啟動5秒後 再啟動 kafka

1.2 下載圖形管理介面 kafka-map

官方 kafka-map: 一個美觀簡潔且強大的kafka web管理工具。

下載地址 https://github.com/dushixiang/kafka-map/releases/latest/download/kafka-map.tgz

解壓到 D:\soft\kafka_2.13-2.8.0\kafka-map

在 D:\soft\kafka_2.13-2.8.0\kafka-map 新建 startKafkaMap.bat 內容如下:

batch java -jar kafka-map.jar

1.3 啟動驗證

啟動 a.bat

啟動 startKafkaMap.bat

訪問 http://localhost:8080/ ,賬戶和密碼預設都是 admin

image.png 看到如下頁面就是成功了

image.png

新增叢集

image.png

image.png

2. kafka connector入門樣例

我們先執行一下官方提供的一個例子,這個例子中會把server.properties 用FileStreamSource 讀取然後儲存到kafka 佇列

然後用FileStreamSink 消費佇列並儲存到 server.properties1

2.1 配置connector

D:\soft\kafka_2.13-2.8.0\config\connect-distributed.properties

textile bootstrap.servers=localhost:9092

2.2 啟動 connector

在 D:\soft\kafka_2.13-2.8.0\bin\windows 目錄下先建立一個aConnect.bat 內容如下:

textile start zookeeper-server-start.bat ../../config/zookeeper.properties sleep 5 start kafka-server-start.bat ../../config/server.properties sleep 5 start connect-distributed.bat ../../config/connect-distributed.properties

2.3 檢查環境

傳送 GET 請求到 http://localhost:8083/ 檢視狀態

image.png

檢視所有外掛 http://localhost:8083/connector-plugins

image.png

2.3 配置 file connector source 端

FileStreamSource 會把 "D:/soft/kafka_2.13-2.8.0/config/server.properties" 這個檔案逐行讀取後儲存到 "kafka-config-topic" 這個佇列中

http://localhost:8083/connectors

json { "name":"load-kafka-config", "config":{ "connector.class":"FileStreamSource", "file":"D:/soft/kafka_2.13-2.8.0/config/server.properties", "topic":"kafka-config-topic" } }

介面返回結果截圖如下

image.png

檢視狀態

http://localhost:8083/connectors/load-kafka-config/status

image.png

在kafka-map中 點選箭頭所指內容

image.png

image.png

image.png

檢視topic 內容

image.png

2.4 配置 file connector sink端

FileStreamSink 會消費 "kafka-config-topic" 這個佇列並把 內容儲存到 "D:/soft/kafka_2.13-2.8.0/config/server.properties1"

http://localhost:8083/connectors

json { "name":"dump-kafka-config", "config":{ "connector.class":"FileStreamSink", "file":"D:/soft/kafka_2.13-2.8.0/config/server.properties1", "topics":"kafka-config-topic" } }

介面返回結果如下;

image.png

執行完成後會發現 D:\soft\kafka_2.13-2.8.0\config 目錄下 多了一個server.properties1 檔案,而且檔案內容和 server.properties 一樣

image.png

2.5 檢視當前活躍的 connector

http://localhost:8083/connectors

image.png

2.6 刪除connector

DELETE http://localhost:8083/connectors/load-kafka-config

DELETE http://localhost:8083/connectors/dump-kafka-config

檢視 活躍的connetor,發現結果已經為空

image.png

3. 整合 debezium mysql connector

在這個例子中 我們會用 debezium mysql connector 監控mysql binlog 並把變更內容儲存到kafka 佇列中

3.1 mysql 準備

為了簡單專門用於複製的使用者我們就不建立了,我們直接用root 使用者

必須配置以下引數

config [mysqld] server-id = 8023 port=8023 log-bin=D:/soft/mysql/mysql8023/data1/8023/binlog/mysql-bin transaction-isolation= READ-COMMITTED binlog_format = ROW gtid_mode =ON enforce_gtid_consistency = ON default-time-zone = '+8:00'

還有一些可選的優化引數請參考官方文件

https://debezium.io/documentation/reference/2.0/connectors/mysql.html#setting-up-mysql

準備好後就可以啟動mysql了,在此我們用的mysql版本為8.0.29

準備一個測試庫和一個測試表

``sql CREATE DATABASEtest1` ;

CREATE TABLE demo ( id INT(10) NOT NULL, name VARCHAR(50) NULL DEFAULT NULL , PRIMARY KEY (id) USING BTREE );

-- 插入測試資料 INSERT INTO demo (id, name) VALUES (111, '222'); ```

3.2 debezium 準備

下載 debezium 整合kafka connect 外掛

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.0.Alpha1/debezium-connector-mysql-2.0.0.Alpha1-plugin.tar.gz

把 檔案debezium-connector-mysql-2.0.0.Alpha1-plugin.tar.gz 解壓到 D:\soft\kafka_2.13-2.8.0\plugin

image.png

connect-distributed.properties 配置如下引數,注意路徑不要包含 debezium-connector-mysql

config plugin.path=D:/soft/kafka_2.13-2.8.0/plugin/

新增完成後需要重啟kafka 和 connect

檢視啟動日誌是否已經載入外掛

txt [2022-05-11 16:09:48,408] INFO Registered loader: PluginClassLoader{pluginLocation=file:/D:/soft/kafka_2.13-2.8.0/plugin/debezium-connector-mysql/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269) [2022-05-11 16:09:48,410] INFO Added plugin 'io.debezium.connector.mysql.MySqlConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)

檢視可用的外掛

image.png

3.3 安裝connnector

傳送 POST 給 http://localhost:8083/connectors

請求正文為 connector 配置引數

json { "name": "conn-8023", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "8023", "database.user": "root", "database.password": "123456", "database.server.id": "8023", "database.server.name": "local8023", "database.include.list": "test1", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.local8023", "include.schema.changes": "true" } }

參考圖片

image.png

3.4 檢視執行狀態

image.png

用kafka-map 檢視資料

image.png

檢視 local8023.test1.demo 的資料

json { "schema": { ... }, "payload": { "before": null, "after": { "id": 111, "name": "222" }, "source": { ... }, "op": "r", "ts_ms": 1652258099933, "transaction": null } }

再執行一些sql測試一下

sql -- 插入測試資料 INSERT INTO `demo` (`id`, `name`) VALUES (222, '新資料222'); update demo set name='111修改後' where id=111; DELETE FROM demo WHERE id=222;

插入事件

json "payload": { "before": null, "after": { "id": 222, "name": "新資料222" }, "source": { ... }, "op": "c", "ts_ms": 1652258788884, "transaction": null }

修改事件

json "payload": { "before": { "id": 111, "name": "222" }, "after": { "id": 111, "name": "111修改後" }, "source": { ... }, "op": "u", "ts_ms": 1652258788887, "transaction": null }

刪除事件

json "payload": { "before": { "id": 222, "name": "新資料222" }, "after": null, "source": { ... }, "op": "d", "ts_ms": 1652258985267, "transaction": null }

刪除後的墓碑事件

image.png