flink-cdc同步mysql資料到hive

語言: CN / TW / HK

本文首發於我的個人部落格網站等待下一個秋-Flink

什麼是CDC?

CDC是(Change Data Capture 變更資料獲取)的簡稱。核心思想是,監測並捕獲資料庫的變動(包括資料 或 資料表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發生的順序完整記錄下來,寫入到訊息中介軟體中以供其他服務進行訂閱及消費。

1. 環境準備

  • mysql

  • Hive

  • flink 1.13.5 on yarn

    說明:如果沒有安裝hadoop,那麼可以不用yarn,直接用flink standalone環境吧。

2. 下載下列依賴包

下面兩個地址下載flink的依賴包,放在lib目錄下面。

  1. flink-sql-connector-hive-2.2.0_2.11-1.13.5.jar

如果你的Flink是其它版本,可以來 這裡 下載。

說明:我hive版本是2.1.1,為啥這裡我選擇版本號是2.2.0呢,這是官方文件給出的版本對應關係:

Metastore version Maven dependency SQL Client JAR
1.0.0 - 1.2.2 flink-sql-connector-hive-1.2.2 Download
2.0.0 - 2.2.0 flink-sql-connector-hive-2.2.0 Download
2.3.0 - 2.3.6 flink-sql-connector-hive-2.3.6 Download
3.0.0 - 3.1.2 flink-sql-connector-hive-3.1.2 Download

官方文件地址在 這裡 ,可以自行檢視。

3. 啟動flink-sql client

1) 先在yarn上面啟動一個application,進入flink13.5目錄,執行:

bin/yarn-session.sh -d -s 2 -jm 1024 -tm 2048 -qu root.sparkstreaming -nm flink-cdc-hive

2) 進入flink sql命令列

bin/sql-client.sh embedded -s flink-cdc-hive

4. 操作Hive

1) 首選建立一個catalog

CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);

這裡需要注意:hive-conf-dir是你的hive配置檔案地址,裡面需要有hive-site.xml這個主要的配置檔案,你可以從hive節點複製那幾個配置檔案到本臺機器上面。

2) 查詢

此時我們應該做一些常規DDL操作,驗證配置是否有問題:

use catalog hive_catalog;
show databases;

隨便查詢一張表

use test
show tables;
select * from people;

可能會報錯:

把hadoop-mapreduce-client-core-3.0.0.jar放到flink的Lib目錄下,這是我的,實際要根據你的hadoop版本對應選擇。

注意:很關鍵,把這個jar包放到Lib下面後,需要重啟application,然後重新用yarn-session啟動一個application,因為我發現好像有快取,把這個application kill 掉,重啟才行:

然後,資料可以查詢了,查詢結果:

5. mysql資料同步到hive

mysql資料無法直接在flink sql匯入hive,需要分成兩步:

  1. mysql資料同步kafka;
  2. kafka資料同步hive;

至於mysql資料增量同步到kafka,前面有文章分析,這裡不在概述;重點介紹kafka資料同步到hive。

1) 建表跟kafka關聯繫結:

前面mysql同步到kafka,在flink sql裡面建表,connector='upsert-kafka',這裡有區別:

CREATE TABLE product_view_mysql_kafka_parser(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp
) WITH (
 'connector' = 'kafka',
 'topic' = 'flink-cdc-kafka',
 'properties.bootstrap.servers' = 'kafka-001:9092',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);

2) 建一張hive表

建立hive需要指定 SET table.sql-dialect=hive; ,否則flink sql 命令列無法識別這個建表語法。為什麼需要這樣,可以看看這個文件 Hive 方言

-- 建立一個catalag使用者hive操作
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);
use catalog hive_catalog;

-- 可以看到我們的hive裡面有哪些資料庫
show databases;
use test;
show tables;

上面我們可以現在看看hive裡面有哪些資料庫,有哪些表;接下來建立一張hive表:

CREATE TABLE product_view_kafka_hive_cdc (
  `id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp
) STORED AS parquet TBLPROPERTIES (
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'auto-compaction'='true',
  'compaction.file-size'='128MB'
);

然後做資料同步:

insert into hive_catalog.test.product_view_kafka_hive_cdc
select * 
from 
default_catalog.default_database.product_view_mysql_kafka_parser;

注意:這裡指定表名,我用的是catalog.database.table,這種格式,因為這是兩個不同的庫,需要明確指定catalog - database - table。

網上還有其它方案,關於mysql實時增量同步到hive:

網上看到一篇寫的 實時數倉架構方案 ,覺得還可以:

參考資料

http://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/hive_dialect/