Flink 最佳實踐:TDSQL Connector 的使用(上)
作者: 姚琦,騰訊 CSIG 工程師
本文介紹瞭如何在 Oceanus 平臺使用 tdsql-subscribe-connector [1] ,從 TDSQL-MySQL 訂閱任務 [2] 建立,到 Oceanus 作業建立、最終資料驗證,實現全流程的操作指導。需要注意的是,本文預設已經建立 TDSQL-MySQL 例項和 Oceanus 叢集,並且二者在同一 VPC 下或者不同 VPC 下但網路已經打通。
上述流程圖簡要說明了使用 tdsql-subscribe-connector 時,整個資料流向情況。TDSQL 的 binlog 資料,會通過訂閱任務傳送到 Kafka(這裡的 Kafka 已經包含在訂閱任務中,無需重新建立例項),然後 Oceanus 可以通過 tdsql-subscribe-connector 接入 Kafka 的資料,由於 Kafka 中的訊息格式比較特殊,無法用常規 Kafka Connector 接入。
建立訂閱任務
建立訂閱任務可以參考 資料傳輸服務 TDSQL MySQL 資料訂閱 [3] ,在訂閱任務建立過程中,需要選擇訂閱的物件,可以選擇不同資料庫下的不同表,或者同一資料庫下的不同表,當訂閱多個表的 binlog 時,多個表中的任意一個的資料變更都會發送到 Kafka ,前提是多個表的 Schema 資訊必須是相同的。
例如,以下訂閱任務中,就指定了同一個庫下的多張表:
創 建 Oceanus SQL 作業
建立 SQL 作業
目前 tdsql-subscribe-connector 僅支援在 SQL 作業中使用,JAR 作業暫時不支援;
在 流計算 Oceanus 控制檯 [4] 的 作業管理 > 新建作業 中新建 SQL 作業 ,選擇在新建的叢集中新建作業。然後在作業的 開發除錯 > 作業引數 中新增必要的 connector,tdsql-subscribe-connector 目前需要手動上傳到依賴管理中,然後在作業引數裡引用該 JAR 包,Connector 的 JAR 包檔案可以聯絡騰訊雲 Oceanus 團隊獲取;
建立 Source 端
CREATE TABLE `DataInput` (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'tdsql-subscribe', -- 注意選擇對應的內建 Connector
'topic' = 'topic-subs-xxx-tdsqlshard-xxx', -- 替換為訂閱任務消費的 Topic
'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一種
'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:321', -- 替換為您的訂閱任務 Kafka 連線地址
'properties.group.id' = 'consumer-grp-subs-xxx-group_2',
'format' = 'protobuf', -- 只能是protobuf格式
'properties.security.protocol'='SASL_PLAINTEXT', -- 認證協議
'properties.sasl.mechanism'='SCRAM-SHA-512', -- 認證方式
'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx!";' --使用者名稱和密碼
);
正常情況下,以上的 Source 端引數,除了欄位定義外,WITH 引數中需要根據具體訂閱任務填寫; 這裡列出 Source 端的相關配置項在訂閱任務的具體位置:
-
topic [資料訂閱] > [檢視訂閱詳情] > [訂閱資訊]
-
properties.bootstrap.servers [資料訂閱] > [檢視訂閱詳情] > [訂閱資訊]
-
properties.group.id [資料訂閱] > [檢視訂閱詳情] > [消費管理]
-
properties.sasl.jaas.config 只需要替換 username 和 password [資料訂閱] > [檢視訂閱詳情] > [消費管理]
建立 Sink 端
-- Logger Sink 可以將輸出資料列印到 TaskManager 的日誌中
-- 程式包下載地址:http://github.com/tencentyun/flink-hello-world/releases
-- 需要先在【程式包管理】中上傳該程式包,然後在【作業引數】中引用它
-- 參見 http://cloud.tencent.com/document/product/849/58713
CREATE TABLE logger_sink_table (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
) WITH (
'connector' = 'logger',
'print-identifier' = 'DebugData'
);
為了驗證方便,這裡 Sink 端採用了 Logger ,可以把資料列印到日誌檔案中,在使用 Logger Connector 前,同樣需要下載相關的 JAR ,上傳到依賴管理,然後在作業引數中引用;
同時,為了更好地驗證日誌中資料列印情況,推薦使用 CLS ,可以更方便地在作業控制檯檢視作業執行日誌;
運算元操 作
INSERT INTO logger_sink_table SELECT * FROM DataInput;
最後,把 Source 端資料插入到 Sink 端;
結果驗證
完成 SQL 作業開發後, 釋出草稿 > 執行作業 ,然後可以在 Source 表中修改或者新增一些資料:
UPDATE `source_table11` SET `name`='test' WHERE `id`=300001;
INSERT INTO `source_table11` (`id`, `name`) VALUES (6000000, 'test');
DELETE FROM source_table11 WHERE id = 6000000
觀察 taskmanager 的日誌,可以看到 logger 打印出對應的 RowData 資訊:
DebugData-toString: +U(300001,test)
DebugData-toString: +I(6000000,test)
DebugData-toString: -D(6000000,test)
注意事項
-
TDSQL-MySQL 和 Oceanus 的 VPC 需要連通或者使用同一 VPC;
-
使用 tdsql-subscribe-connector 前,需要構建資料訂閱任務;
-
tdsql-subscribe-connector 目前只支援增量階段,沒有全量階段;
-
當訂閱任務指定了多個表時,多個表的 Schema 需要保持一致;
參考連結
[1] tdsql-subscribe-connector: http://cloud.tencent.com/document/product/849/71448
[2] 訂閱任務: http://cloud.tencent.com/document/product/571/68060
[3] 資料傳輸服務 TDSQL MySQL 資料訂閱: http://cloud.tencent.com/document/product/571/68060
[4] 流計算 Oceanus 控制檯: http://console.cloud.tencent.com/oceanus
[5] Logger: http://cloud.tencent.com/document/product/849/58713
點選文末 「閱讀原文」 ,瞭解騰訊雲流計算 Oceanus 更多資訊 ~
騰訊雲大資料
長按二維碼
關注我們
- 最佳實踐:MySQL CDC 同步資料到 ES
- 騰訊雲ES:一站式配置,TKE容器日誌採集與分析就是這麼簡單!
- 速度提升10倍,騰訊基於Iceberg的資料治理與優化實踐
- Flink 實踐教程:入門(12):元資料的使用
- Flink Metrics&REST API 介紹和原理解析
- Flink 最佳實踐:TDSQL Connector 的使用(上)
- Flink Watermark 機制及總結
- Flink 實踐教程-進階(10):自定義聚合函式(UDAF)
- Flink 實踐教程-進階(9):自定義表值函式(UDTF)
- 資料分析小結:使用流計算 Oceanus(Flink) SQL 作業進行資料型別轉換
- Flink 實踐教程-進階(8):自定義標量函式(UDF)
- 實時數倉:基於 Flink CDC 實現 Oracle 資料實時更新到 Kudu
- 基於流計算 Oceanus(Flink) CDC 做好資料整合場景
- Flink 實現 MySQL CDC 動態同步表結構
- 流計算 Oceanus | Flink JVM 記憶體超限的分析方法總結
- Flink 實踐教程-進階(6):CEP 複雜事件處理
- 騰訊雲 AI 視覺產品基於流計算 Oceanus(Flink)的計費資料去重嘗試
- 騰訊雲原生實時數倉建設實踐
- 專家帶你吃透 Flink 架構:一個新版 Connector 的實現
- 流計算 Oceanus | 巧用 Flink 構建高效能 ClickHouse 實時數倉