應用實踐 | Apache Doris 整合 Iceberg + Flink CDC 構建實時湖倉一體的聯邦查詢分析架構
應用實踐 | Apache Doris 整合 Iceberg + Flink CDC 構建實時湖倉一體的聯邦查詢分析架構
導讀:這是一篇非常完整全面的應用技術乾貨,手把手教你如何使用 Doris+Iceberg+Flink CDC 構建實時湖倉一體的聯邦查詢分析架構。按照本文中步驟一步步完成,完整體驗搭建操作的完整過程。
作者|Apache Doris PMC 成員 張家鋒
1.概覽
這篇教程將展示如何使用 Doris+Iceberg+Flink CDC 構建實時湖倉一體的聯邦查詢分析,Doris 1.1版本提供了Iceberg的支援,本文主要展示Doris和Iceberg怎麼使用,同時本教程整個環境是都基於偽分散式環境搭建,大家按照步驟可以一步步完成。完整體驗整個搭建操作的過程。
1.1 軟體環境
本教程的演示環境如下:
- Centos7
- Apahce doris 1.1
- Hadoop 3.3.3
- hive 3.1.3
- Fink 1.14.4
- flink-sql-connector-mysql-cdc-2.2.1
- Apache Iceberg 0.13.2
- JDK 1.8.0_311
- MySQL 8.0.29
wget http://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gz
wget http://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
wget http://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
wget http://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget http://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
1.2 系統架構
我們整理架構圖如下
- 首先我們從Mysql資料中使用Flink 通過 Binlog完成資料的實時採集
- 然後再Flink 中建立 Iceberg 表,Iceberg的元資料儲存在hive裡
- 最後我們在Doris中建立Iceberg外表
- 在通過Doris 統一查詢入口完成對Iceberg裡的資料進行查詢分析,供前端應用呼叫,這裡iceberg外表的資料可以和Doris內部資料或者Doris其他外部資料來源的資料進行關聯查詢分析
Doris湖倉一體的聯邦查詢架構如下:
- Doris 通過 ODBC 方式支援:MySQL,Postgresql,Oracle ,SQLServer
- 同時支援 Elasticsearch 外表
- 1.0版本支援Hive外表
- 1.1版本支援Iceberg外表
- 1.2版本支援Hudi 外表
2.環境安裝部署
2.1 安裝Hadoop、Hive
tar zxvf hadoop-3.3.3.tar.gz
tar zxvf apache-hive-3.1.3-bin.tar.gz
配置系統環境變數
export HADOOP_HOME=/data/hadoop-3.3.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HIVE_HOME=/data/hive-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf
2.2 配置hdfs
2.2.1 core-site.xml
vi etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
2.2.2 hdfs-site.xml
vi etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hdfs/datanode</value>
</property>
</configuration>
2.2.3 修改Hadoop啟動指令碼
sbin/start-dfs.sh
sbin/stop-dfs.sh
在檔案開始加上下面的內容
HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root
sbin/start-yarn.sh
sbin/stop-yarn.sh
在檔案開始加上下面的內容
YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root
2.3 配置yarn
這裡我改變了Yarn的一些埠,因為我是單機環境和Doris 的一些埠衝突。你可以不啟動yarn
vi etc/hadoop/yarn-site.xml
<property>
<name>yarn.resourcemanager.address</name>
<value>jiafeng-test:50056</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>jiafeng-test:50057</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>jiafeng-test:50058</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>jiafeng-test:50059</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>jiafeng-test:9090</value>
</property>
<property>
<name>yarn.nodemanager.localizer.address</name>
<value>0.0.0.0:50060</value>
</property>
<property>
<name>yarn.nodemanager.webapp.address</name>
<value>0.0.0.0:50062</value>
</property>
vi etc/hadoop/mapred-site.xm
<property>
<name>mapreduce.jobhistory.address</name>
<value>0.0.0.0:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>0.0.0.0:19888</value>
</property>
<property>
<name>mapreduce.shuffle.port</name>
<value>50061</value>
</property>
2.2.4 啟動hadoop
sbin/start-all.sh
2.4 配置Hive
2.4.1 建立hdfs目錄
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -mkdir /tmp
hdfs dfs -chmod g+w /user/hive/warehouse
hdfs dfs -chmod g+w /tmp
2.4.2 配置hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>MyNewPass4!</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value/>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>javax.jdo.PersistenceManagerFactoryClass</name>
<value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>
2.4.3 配置 hive-env.sh
加入一下內容
HADOOP_HOME=/data/hadoop-3.3.3
2.4.4 hive元資料初始化
schematool -initSchema -dbType mysql
2.4.5 啟動hive metaservice
後臺執行
nohup bin/hive --service metaservice 1>/dev/null 2>&1 &
驗證
lsof -i:9083
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 20700 root 567u IPv6 54605348 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)
2.5 安裝MySQL
具體請參照這裡:
使用 Flink CDC 實現 MySQL 資料實時入 Apache Doris
2.5.1 建立MySQL資料庫表並初始化資料
CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
id int NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255),
PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);
2.6 安裝 Flink
tar zxvf flink-1.14.4-bin-scala_2.12.tgz
然後需要將下面的依賴拷貝到Flink安裝目錄下的lib目錄下,具體的依賴的lib檔案如下:
下面將幾個Hadoop和Flink裡沒有的依賴下載地址放在下面
wget http://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget http://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget http://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget http://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
其他的:
hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jar
hadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jar
hadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar
adoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jar
hadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jar
hive-3.1.3/lib/hive-exec-3.1.3.jar
hive-3.1.3/lib/hive-metastore-3.1.3.jar
hive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar
2.6.1 啟動Flink
bin/start-cluster.sh
啟動後的介面如下:
2.6.2 進入 Flink SQL Client
bin/sql-client.sh embedded
開啟 checkpoint,每隔3秒做一次 checkpoint
Checkpoint 預設是不開啟的,我們需要開啟 Checkpoint 來讓 Iceberg 可以提交事務。 並且,mysql-cdc 在 binlog 讀取階段開始前,需要等待一個完整的 checkpoint 來避免 binlog 記錄亂序的情況。
注意:
這裡是演示環境,checkpoint的間隔設定比較短,線上使用,建議設定為3-5分鐘一次checkpoint。
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
2.6.3 建立Iceberg Catalog
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);
檢視catalog
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| hive_catalog |
+-----------------+
2 rows in set
2.6.4 建立 Mysql CDC 表
CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'MyNewPass4!',
'database-name' = 'demo',
'table-name' = 'userinfo'
);
查詢CDC表:
select * from user_source;
2.6.5 建立Iceberg表
---檢視catalog
show catalogs;
---使用catalog
use catalog hive_catalog;
--建立資料庫
CREATE DATABASE iceberg_hive;
--使用資料庫
use iceberg_hive;
2.6.5.1 建立表
CREATE TABLE all_users_info (
database_name STRING,
table_name STRING,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
) WITH (
'catalog-type'='hive'
);
從CDC表裡插入資料到Iceberg表裡
use catalog default_catalog;
insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;
在web介面可以看到任務的執行情況
然後停掉任務,我們去查詢iceberg表
select * from hive_catalog.iceberg_hive.all_users_info
可以看到下面的結果
我們去hdfs上可以看到hive目錄下的資料及對應的元資料
我們也可以通過Hive建好Iceberg表,然後通過Flink將資料插入到表裡
下載Iceberg Hive執行依賴
wget http://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar
在hive shell下執行:
SET engine.hive.enabled=true;
SET iceberg.engine.hive.enabled=true;
SET iceberg.mr.catalog=hive;
add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;
建立表
CREATE EXTERNAL TABLE iceberg_hive(
`id` int,
`name` string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
);
然後再Flink SQL Client下執行下面語句將資料插入到Iceber表裡
INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');
INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');
查詢這個表
select * from hive_catalog.iceberg_hive.iceberg_hive
可以看到下面的結果
3. Doris 查詢 Iceberg
Apache Doris 提供了 Doris 直接訪問 Iceberg 外部表的能力,外部表省去了繁瑣的資料匯入工作,並藉助 Doris 本身的 OLAP 的能力來解決 Iceberg 表的資料分析問題:
- 支援 Iceberg 資料來源接入Doris
- 支援 Doris 與 Iceberg 資料來源中的表聯合查詢,進行更加複雜的分析操作
3.1安裝Doris
這裡我們不在詳細講解Doris的安裝,如果你不知道怎麼安裝Doris請參照官方文件:快速入門
3.2 建立Iceberg外表
CREATE TABLE `all_users_info`
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris" = "thrift://localhost:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
引數說明:
-
ENGINE 需要指定為 ICEBERG
-
PROPERTIES 屬性:
iceberg.hive.metastore.uris
:Hive Metastore 服務地址iceberg.database
:掛載 Iceberg 對應的資料庫名iceberg.table
:掛載 Iceberg 對應的表名,掛載 Iceberg database 時無需指定。iceberg.catalog.type
:Iceberg 中使用的 catalog 方式,預設為HIVE_CATALOG
,當前僅支援該方式,後續會支援更多的 Iceberg catalog 接入方式。
mysql> CREATE TABLE `all_users_info`
-> ENGINE = ICEBERG
-> PROPERTIES (
-> "iceberg.database" = "iceberg_hive",
-> "iceberg.table" = "all_users_info",
-> "iceberg.hive.metastore.uris" = "thrift://localhost:9083",
-> "iceberg.catalog.type" = "HIVE_CATALOG"
-> );
Query OK, 0 rows affected (0.23 sec)
mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id | name | address | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL |
| demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL |
| demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL |
| demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL |
| demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL |
| demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL |
| demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL |
| demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL |
| demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)
3.3 同步掛載
當 Iceberg 表 Schema 發生變更時,可以通過 REFRESH
命令手動同步,該命令會將 Doris 中的 Iceberg 外表刪除重建。
-- 同步 Iceberg 表
REFRESH TABLE t_iceberg;
-- 同步 Iceberg 資料庫
REFRESH DATABASE iceberg_test_db;
3.4 Doris 和 Iceberg 資料型別對應關係
支援的 Iceberg 列型別與 Doris 對應關係如下表:
ICEBERG | DORIS | 描述 |
---|---|---|
BOOLEAN | BOOLEAN | |
INTEGER | INT | |
LONG | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DATE | DATE | |
TIMESTAMP | DATETIME | Timestamp 轉成 Datetime 會損失精度 |
STRING | STRING | |
UUID | VARCHAR | 使用 VARCHAR 來代替 |
DECIMAL | DECIMAL | |
TIME | - | 不支援 |
FIXED | - | 不支援 |
BINARY | - | 不支援 |
STRUCT | - | 不支援 |
LIST | - | 不支援 |
MAP | - | 不支援 |
3.5 注意事項
- Iceberg 表 Schema 變更不會自動同步,需要在 Doris 中通過
REFRESH
命令同步 Iceberg 外表或資料庫。 - 當前預設支援的 Iceberg 版本為 0.12.0,0.13.x,未在其他版本進行測試。後續後支援更多版本。
3.6 Doris FE 配置
下面幾個配置屬於 Iceberg 外表系統級別的配置,可以通過修改 fe.conf
來配置,也可以通過 ADMIN SET CONFIG
來配置。
-
iceberg_table_creation_strict_mode
建立 Iceberg 表預設開啟 strict mode。 strict mode 是指對 Iceberg 表的列型別進行嚴格過濾,如果有 Doris 目前不支援的資料型別,則建立外表失敗。
-
iceberg_table_creation_interval_second
自動建立 Iceberg 表的後臺任務執行間隔,預設為 10s。
-
max_iceberg_table_creation_record_size
Iceberg 表建立記錄保留的最大值,預設為 2000. 僅針對建立 Iceberg 資料庫記錄。
4. 總結
這裡Doris On Iceberg我們只演示了Iceberg單表的查詢,你還可以聯合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進行聯合查詢分析,通過Doris對外提供統一的查詢分析入口。
自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著資料倉庫和資料融合的架構演進,支援湖倉一體的聯邦查詢,給我們的開發帶來更多的便利,更高效的開發,省去了很多資料同步的繁瑣工作,快快來體驗吧。 最後,歡迎更多的開源技術愛好者加入 Apache Doris 社群,攜手成長,共建社群生態。
SelectDB 是一家開源技術公司,致力於為 Apache Doris 社群提供一個由全職工程師、產品經理和支援工程師組成的團隊,繁榮開源社群生態,打造實時分析型資料庫領域的國際工業界標準。基於 Apache Doris 研發的新一代雲原生實時數倉 SelectDB,運行於多家雲上,為使用者和客戶提供開箱即用的能力。
相關連結:
SelectDB 官方網站:
http://selectdb.com (We Are Coming Soon)
Apache Doris 官方網站:
Apache Doris Github:
http://github.com/apache/doris
Apache Doris 開發者郵件組:
- 技術乾貨|如何將 Pulsar 資料快速且無縫接入 Apache Doris
- Apache Doris 1.1 特性揭祕:Flink 實時寫入如何兼顧高吞吐和低延時
- 技術解析|Doris Connector 結合 Flink CDC 實現 MySQL 分庫分表 Exactly Once精準接入
- 知乎基於 Apache Doris 的 DMP 平臺架構建設實踐|萬字長文詳解
- 應用實踐 | 數倉體系效率全面提升!同程數科基於 Apache Doris 的資料倉庫建設
- 應用實踐 | 蜀海供應鏈基於 Apache Doris 的資料中臺建設
- 應用實踐 | Apache Doris 在網易互娛的應用實踐
- 應用實踐 | 10 億資料秒級關聯,貨拉拉基於 Apache Doris 的 OLAP 體系演進(附 PPT 下載)
- 應用實踐 | 海量資料,秒級分析!Flink Doris 構建實時數倉方案
- 應用實踐 | Apache Doris 整合 Iceberg Flink CDC 構建實時湖倉一體的聯邦查詢分析架構
- 挑戰最全 Apache Doris 學習資料,你想要的都在這裡了!
- 官宣!Apache Doris 從 Apache 基金會畢業,正式成為 Apache 頂級專案!
- 應用實踐 | Apache Doris 在網易互娛的應用實踐
- 應用實踐 | 物易雲通基於 Apache Doris 的實時資料倉庫建設
- 應用實踐|Lifewit 資料平臺基於Apache Doris的建設實踐