Flink 最佳實踐:TDSQL Connector 的使用(上)

語言: CN / TW / HK

作者: 姚琦,騰訊 CSIG 工程師

本文介紹瞭如何在 Oceanus 平臺使用 tdsql-subscribe-connector [1] ,從 TDSQL-MySQL 訂閱任務 [2] 建立,到 Oceanus 作業建立、最終資料驗證,實現全流程的操作指導。需要注意的是,本文預設已經建立 TDSQL-MySQL 例項和 Oceanus 叢集,並且二者在同一 VPC 下或者不同 VPC 下但網路已經打通。

上述流程圖簡要說明了使用 tdsql-subscribe-connector 時,整個資料流向情況。TDSQL 的 binlog 資料,會通過訂閱任務傳送到 Kafka(這裡的 Kafka 已經包含在訂閱任務中,無需重新建立例項),然後 Oceanus 可以通過 tdsql-subscribe-connector 接入 Kafka 的資料,由於 Kafka 中的訊息格式比較特殊,無法用常規 Kafka Connector 接入。

建立訂閱任務

建立訂閱任務可以參考 資料傳輸服務 TDSQL MySQL 資料訂閱 [3] ,在訂閱任務建立過程中,需要選擇訂閱的物件,可以選擇不同資料庫下的不同表,或者同一資料庫下的不同表,當訂閱多個表的 binlog 時,多個表中的任意一個的資料變更都會發送到 Kafka ,前提是多個表的 Schema 資訊必須是相同的。

例如,以下訂閱任務中,就指定了同一個庫下的多張表:

建 Oceanus SQL 作業

建立 SQL 作業

目前 tdsql-subscribe-connector 僅支援在 SQL 作業中使用,JAR 作業暫時不支援;

流計算 Oceanus 控制檯 [4] 的 作業管理 > 新建作業 中新建 SQL 作業 ,選擇在新建的叢集中新建作業。然後在作業的 開發除錯 > 作業引數 中新增必要的 connector,tdsql-subscribe-connector 目前需要手動上傳到依賴管理中,然後在作業引數裡引用該 JAR 包,Connector 的 JAR 包檔案可以聯絡騰訊雲 Oceanus 團隊獲取;

建立 Source 端

CREATE TABLE `DataInput` (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'tdsql-subscribe', -- 注意選擇對應的內建 Connector
'topic' = 'topic-subs-xxx-tdsqlshard-xxx', -- 替換為訂閱任務消費的 Topic
'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一種
'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:321', -- 替換為您的訂閱任務 Kafka 連線地址
'properties.group.id' = 'consumer-grp-subs-xxx-group_2',
'format' = 'protobuf', -- 只能是protobuf格式
'properties.security.protocol'='SASL_PLAINTEXT', -- 認證協議
'properties.sasl.mechanism'='SCRAM-SHA-512', -- 認證方式
'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx!";' --使用者名稱和密碼
);

正常情況下,以上的 Source 端引數,除了欄位定義外,WITH 引數中需要根據具體訂閱任務填寫; 這裡列出 Source 端的相關配置項在訂閱任務的具體位置:

  • topic [資料訂閱] > [檢視訂閱詳情] > [訂閱資訊]

  • properties.bootstrap.servers [資料訂閱] > [檢視訂閱詳情] > [訂閱資訊]

  • properties.group.id [資料訂閱] > [檢視訂閱詳情] > [消費管理]

  • properties.sasl.jaas.config 只需要替換 username 和 password [資料訂閱] > [檢視訂閱詳情] > [消費管理]

建立 Sink 端

-- Logger Sink 可以將輸出資料列印到 TaskManager 的日誌中  
-- 程式包下載地址:https://github.com/tencentyun/flink-hello-world/releases
-- 需要先在【程式包管理】中上傳該程式包,然後在【作業引數】中引用它
-- 參見 https://cloud.tencent.com/document/product/849/58713
CREATE TABLE logger_sink_table (
id INT PRIMARY KEY NOT ENFORCED,
name STRING
) WITH (
'connector' = 'logger',
'print-identifier' = 'DebugData'
);

為了驗證方便,這裡 Sink 端採用了 Logger ,可以把資料列印到日誌檔案中,在使用 Logger Connector 前,同樣需要下載相關的 JAR ,上傳到依賴管理,然後在作業引數中引用;

同時,為了更好地驗證日誌中資料列印情況,推薦使用 CLS ,可以更方便地在作業控制檯檢視作業執行日誌;

運算元操

INSERT INTO logger_sink_table SELECT * FROM DataInput;

最後,把 Source 端資料插入到 Sink 端;

結果驗證

完成 SQL 作業開發後, 釋出草稿 > 執行作業 ,然後可以在 Source 表中修改或者新增一些資料:

UPDATE `source_table11` SET `name`='test' WHERE `id`=300001;
INSERT INTO `source_table11` (`id`, `name`) VALUES (6000000, 'test');
DELETE FROM source_table11 WHERE id = 6000000

觀察 taskmanager 的日誌,可以看到 logger 打印出對應的 RowData 資訊:

DebugData-toString: +U(300001,test)
DebugData-toString: +I(6000000,test)
DebugData-toString: -D(6000000,test)

注意事項

  1. TDSQL-MySQL 和 Oceanus 的 VPC 需要連通或者使用同一 VPC;

  2. 使用 tdsql-subscribe-connector 前,需要構建資料訂閱任務;

  3. tdsql-subscribe-connector 目前只支援增量階段,沒有全量階段;

  4. 當訂閱任務指定了多個表時,多個表的 Schema 需要保持一致;

參考連結

[1] tdsql-subscribe-connector: https://cloud.tencent.com/document/product/849/71448

[2] 訂閱任務: https://cloud.tencent.com/document/product/571/68060

[3] 資料傳輸服務 TDSQL MySQL 資料訂閱: https://cloud.tencent.com/document/product/571/68060

[4] 流計算 Oceanus 控制檯: https://console.cloud.tencent.com/oceanus

[5] Logger: https://cloud.tencent.com/document/product/849/58713

流計算 Oceanus  限量秒殺專享活動火爆進行中↓↓

點選文末 「閱讀原文」 ,瞭解騰訊雲流計算 Oceanus 更多資訊 ~

騰訊雲大資料

長按二維碼
關注我們